From f2f9b45aa6e175b63417b946d5980524933e91ed Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Thu, 5 Jun 2025 23:23:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9Mock=E5=A4=A7=E5=86=99?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E5=A4=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/devices/mock/__init__.py | 0 unilabos/devices/mock/mock_chiller.py | 170 +++++++ unilabos/devices/mock/mock_filter.py | 170 +++++++ unilabos/devices/mock/mock_heater.py | 202 ++++++++ unilabos/devices/mock/mock_pump.py | 414 ++++++++++++++++ unilabos/devices/mock/mock_rotavap.py | 443 +++++++++++++++++ unilabos/devices/mock/mock_separator.py | 184 +++++++ unilabos/devices/mock/mock_solenoid_valve.py | 89 ++++ unilabos/devices/mock/mock_stirrer.py | 482 +++++++++++++++++++ unilabos/devices/mock/mock_vacuum.py | 410 ++++++++++++++++ 10 files changed, 2564 insertions(+) create mode 100644 unilabos/devices/mock/__init__.py create mode 100644 unilabos/devices/mock/mock_chiller.py create mode 100644 unilabos/devices/mock/mock_filter.py create mode 100644 unilabos/devices/mock/mock_heater.py create mode 100644 unilabos/devices/mock/mock_pump.py create mode 100644 unilabos/devices/mock/mock_rotavap.py create mode 100644 unilabos/devices/mock/mock_separator.py create mode 100644 unilabos/devices/mock/mock_solenoid_valve.py create mode 100644 unilabos/devices/mock/mock_stirrer.py create mode 100644 unilabos/devices/mock/mock_vacuum.py diff --git a/unilabos/devices/mock/__init__.py b/unilabos/devices/mock/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/unilabos/devices/mock/mock_chiller.py b/unilabos/devices/mock/mock_chiller.py new file mode 100644 index 0000000..46bb7f0 --- /dev/null +++ b/unilabos/devices/mock/mock_chiller.py @@ -0,0 +1,170 @@ +import time +import threading + + +class MockChiller: + def __init__(self, port: str = "MOCK"): + self.port = port + self._current_temperature: float = 25.0 # 室温开始 + self._target_temperature: float = 25.0 + self._status: str = "Idle" + self._is_cooling: bool = False + self._is_heating: bool = False + self._power_on: bool = False + + # 模拟温度变化的线程 + self._temperature_thread = None + self._running = False + + @property + def current_temperature(self) -> float: + """当前温度 - 会被自动识别的设备属性""" + return self._current_temperature + + @property + def target_temperature(self) -> float: + """目标温度""" + return self._target_temperature + + @property + def status(self) -> str: + """设备状态 - 会被自动识别的设备属性""" + return self._status + + @property + def power_on(self) -> bool: + """电源状态""" + return self._power_on + + @property + def is_cooling(self) -> bool: + """是否正在冷却""" + return self._is_cooling + + @property + def is_heating(self) -> bool: + """是否正在加热""" + return self._is_heating + + def set_temperature(self, temperature: float): + """设置目标温度 - 需要在注册表添加的设备动作""" + if not self._power_on: + self._status = "Error: Power Off" + return False + + # 将传入温度转换为 float,并限制在允许范围内 + temperature = float(temperature) + self._target_temperature = temperature + + # 立即更新状态 + diff = self._target_temperature - self._current_temperature + if abs(diff) < 0.1: + self._status = "At Target Temperature" + self._is_cooling = False + self._is_heating = False + elif diff < 0: + self._status = "Cooling" + self._is_cooling = True + self._is_heating = False + else: + self._status = "Heating" + self._is_heating = True + self._is_cooling = False + + # 启动温度控制 + self._start_temperature_control() + return True + + def power_on_off(self, power_state: str): + """开关机控制""" + if power_state == "on": + self._power_on = True + # 不在这里直接设置状态和加热/制冷标志 + self._start_temperature_control() + else: + self._power_on = False + self._status = "Power Off" + self._stop_temperature_control() + self._is_cooling = False + self._is_heating = False + + def _start_temperature_control(self): + """启动温度控制线程""" + if self._power_on: # 移除 not self._running 检查 + self._running = True + if self._temperature_thread is None or not self._temperature_thread.is_alive(): + self._temperature_thread = threading.Thread(target=self._temperature_control_loop) + self._temperature_thread.daemon = True + self._temperature_thread.start() + + def _stop_temperature_control(self): + """停止温度控制""" + self._running = False + if self._temperature_thread: + self._temperature_thread.join(timeout=1.0) + + def _temperature_control_loop(self): + """温度控制循环 - 模拟真实冷却器的温度变化""" + while self._running and self._power_on: + temp_diff = self._target_temperature - self._current_temperature + + if abs(temp_diff) < 0.1: # 将判断范围从0.5改小到0.1 + self._status = "At Target Temperature" + self._is_cooling = False + self._is_heating = False + elif temp_diff < 0: # 需要冷却 + self._status = "Cooling" + self._is_cooling = True + self._is_heating = False + # 模拟冷却过程,每秒降低0.5度 + self._current_temperature -= 0.5 + else: # 需要加热 + self._status = "Heating" + self._is_heating = True + self._is_cooling = False + # 模拟加热过程,每秒升高0.3度 + self._current_temperature += 0.3 + + # 限制温度范围 + self._current_temperature = max(-20.0, min(80.0, self._current_temperature)) + + time.sleep(1.0) # 每秒更新一次 + + def emergency_stop(self): + """紧急停止""" + self._status = "Emergency Stop" + self._stop_temperature_control() + self._is_cooling = False + self._is_heating = False + + def get_status_info(self) -> dict: + """获取完整状态信息""" + return { + "current_temperature": self._current_temperature, + "target_temperature": self._target_temperature, + "status": self._status, + "power_on": self._power_on, + "is_cooling": self._is_cooling, + "is_heating": self._is_heating, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + chiller = MockChiller() + + # 测试基本功能 + print("启动冷却器测试...") + chiller.power_on_off("on") + print(f"初始状态: {chiller.get_status_info()}") + + # 设置目标温度为5度 + chiller.set_temperature(5.0) + + # 模拟运行10秒 + for i in range(10): + time.sleep(1) + print(f"第{i+1}秒: 当前温度={chiller.current_temperature:.1f}°C, 状态={chiller.status}") + + chiller.emergency_stop() + print("测试完成") diff --git a/unilabos/devices/mock/mock_filter.py b/unilabos/devices/mock/mock_filter.py new file mode 100644 index 0000000..6581ac4 --- /dev/null +++ b/unilabos/devices/mock/mock_filter.py @@ -0,0 +1,170 @@ +import time +import threading + + +class MockFilter: + def __init__(self, port: str = "MOCK"): + self.port = port + self._status: str = "Idle" + self._is_filtering: bool = False + self._filter_efficiency: float = 95.0 # 过滤效率百分比 + self._flow_rate: float = 0.0 # 流速 L/min + self._pressure_drop: float = 0.0 # 压降 Pa + self._filter_life: float = 100.0 # 滤芯寿命百分比 + self._power_on: bool = False + + # 模拟过滤过程的线程 + self._filter_thread = None + self._running = False + + @property + def status(self) -> str: + """设备状态 - 会被自动识别的设备属性""" + return self._status + + @property + def is_filtering(self) -> bool: + """是否正在过滤""" + return self._is_filtering + + @property + def filter_efficiency(self) -> float: + """过滤效率""" + return self._filter_efficiency + + @property + def flow_rate(self) -> float: + """流速""" + return self._flow_rate + + @property + def pressure_drop(self) -> float: + """压降""" + return self._pressure_drop + + @property + def filter_life(self) -> float: + """滤芯寿命""" + return self._filter_life + + @property + def power_on(self) -> bool: + """电源状态""" + return self._power_on + + def start_filtering(self, flow_rate: float = 1.0): + """开始过滤 - 需要在注册表添加的设备动作""" + if not self._power_on: + self._status = "Error: Power Off" + return False + + self._flow_rate = flow_rate + self._status = "Starting Filter" + self._start_filter_process() + return True + + def stop_filtering(self): + """停止过滤""" + self._status = "Stopping Filter" + self._stop_filter_process() + self._flow_rate = 0.0 + self._is_filtering = False + self._status = "Idle" + return True + + def power_on_off(self, power_state: str): + """开关机控制""" + if power_state == "on": + self._power_on = True + self._status = "Power On" + else: + self._power_on = False + self._status = "Power Off" + self._stop_filter_process() + self._is_filtering = False + self._flow_rate = 0.0 + + def replace_filter(self): + """更换滤芯""" + self._filter_life = 100.0 + self._filter_efficiency = 95.0 + self._status = "Filter Replaced" + return True + + def _start_filter_process(self): + """启动过滤过程线程""" + if not self._running and self._power_on: + self._running = True + self._is_filtering = True + self._filter_thread = threading.Thread(target=self._filter_loop) + self._filter_thread.daemon = True + self._filter_thread.start() + + def _stop_filter_process(self): + """停止过滤过程""" + self._running = False + if self._filter_thread: + self._filter_thread.join(timeout=1.0) + + def _filter_loop(self): + """过滤过程循环 - 模拟真实过滤器的工作过程""" + while self._running and self._power_on and self._is_filtering: + self._status = "Filtering" + + # 模拟滤芯磨损 + if self._filter_life > 0: + self._filter_life -= 0.1 # 每秒减少0.1%寿命 + + # 根据滤芯寿命调整效率和压降 + life_factor = self._filter_life / 100.0 + self._filter_efficiency = 95.0 * life_factor + 50.0 * (1 - life_factor) + self._pressure_drop = 100.0 + (200.0 * (1 - life_factor)) # 压降随磨损增加 + + # 检查滤芯是否需要更换 + if self._filter_life <= 10.0: + self._status = "Filter Needs Replacement" + + time.sleep(1.0) # 每秒更新一次 + + def emergency_stop(self): + """紧急停止""" + self._status = "Emergency Stop" + self._stop_filter_process() + self._is_filtering = False + self._flow_rate = 0.0 + + def get_status_info(self) -> dict: + """获取完整状态信息""" + return { + "status": self._status, + "is_filtering": self._is_filtering, + "filter_efficiency": self._filter_efficiency, + "flow_rate": self._flow_rate, + "pressure_drop": self._pressure_drop, + "filter_life": self._filter_life, + "power_on": self._power_on, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + filter_device = MockFilter() + + # 测试基本功能 + print("启动过滤器测试...") + filter_device.power_on_off("on") + print(f"初始状态: {filter_device.get_status_info()}") + + # 开始过滤 + filter_device.start_filtering(2.0) + + # 模拟运行10秒 + for i in range(10): + time.sleep(1) + print( + f"第{i+1}秒: 效率={filter_device.filter_efficiency:.1f}%, " + f"寿命={filter_device.filter_life:.1f}%, 状态={filter_device.status}" + ) + + filter_device.emergency_stop() + print("测试完成") diff --git a/unilabos/devices/mock/mock_heater.py b/unilabos/devices/mock/mock_heater.py new file mode 100644 index 0000000..6d9abff --- /dev/null +++ b/unilabos/devices/mock/mock_heater.py @@ -0,0 +1,202 @@ +import time +import threading + + +class MockHeater: + def __init__(self, port: str = "MOCK"): + self.port = port + self._current_temperature: float = 25.0 # 室温开始 + self._target_temperature: float = 25.0 + self._status: str = "Idle" + self._is_heating: bool = False + self._power_on: bool = False + self._heating_power: float = 0.0 # 加热功率百分比 0-100 + self._max_temperature: float = 300.0 # 最大加热温度 + + # 模拟加热过程的线程 + self._heating_thread = None + self._running = False + + @property + def current_temperature(self) -> float: + """当前温度 - 会被自动识别的设备属性""" + return self._current_temperature + + @property + def target_temperature(self) -> float: + """目标温度""" + return self._target_temperature + + @property + def status(self) -> str: + """设备状态 - 会被自动识别的设备属性""" + return self._status + + @property + def power_on(self) -> bool: + """电源状态""" + return self._power_on + + @property + def is_heating(self) -> bool: + """是否正在加热""" + return self._is_heating + + @property + def heating_power(self) -> float: + """加热功率百分比""" + return self._heating_power + + @property + def max_temperature(self) -> float: + """最大加热温度""" + return self._max_temperature + + def set_temperature(self, temperature: float): + """设置目标温度 - 需要在注册表添加的设备动作""" + try: + temperature = float(temperature) + except ValueError: + self._status = "Error: Invalid temperature value" + return False + + if not self._power_on: + self._status = "Error: Power Off" + return False + + if temperature > self._max_temperature: + self._status = f"Error: Temperature exceeds maximum ({self._max_temperature}°C)" + return False + + self._target_temperature = temperature + self._status = "Setting Temperature" + + # 启动加热控制 + self._start_heating_control() + return True + + def set_heating_power(self, power: float): + """设置加热功率""" + try: + power = float(power) + except ValueError: + self._status = "Error: Invalid power value" + return False + + if not self._power_on: + self._status = "Error: Power Off" + return False + + self._heating_power = max(0.0, min(100.0, power)) # 限制在0-100% + return True + + def power_on_off(self, power_state: str): + """开关机控制,接收字符串命令 "On" 或 "Off" """ + power_state = power_state.capitalize() + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + return "Error" + self._power_on = True if power_state == "On" else False + if self._power_on: + self._status = "Power On" + self._start_heating_control() + else: + self._status = "Power Off" + self._stop_heating_control() + self._is_heating = False + self._heating_power = 0.0 + return "Success" + + def _start_heating_control(self): + """启动加热控制线程""" + if not self._running and self._power_on: + self._running = True + self._heating_thread = threading.Thread(target=self._heating_control_loop) + self._heating_thread.daemon = True + self._heating_thread.start() + + def _stop_heating_control(self): + """停止加热控制""" + self._running = False + if self._heating_thread: + self._heating_thread.join(timeout=1.0) + + def _heating_control_loop(self): + """加热控制循环 - 模拟真实加热器的温度变化""" + while self._running and self._power_on: + temp_diff = self._target_temperature - self._current_temperature + + if abs(temp_diff) < 0.5: # 温度接近目标值 + self._status = "At Target Temperature" + self._is_heating = False + self._heating_power = 10.0 # 维持温度的最小功率 + elif temp_diff > 0: # 需要加热 + self._status = "Heating" + self._is_heating = True + # 根据温差调整加热功率 + if temp_diff > 50: + self._heating_power = 100.0 + elif temp_diff > 20: + self._heating_power = 80.0 + elif temp_diff > 10: + self._heating_power = 60.0 + else: + self._heating_power = 40.0 + + # 模拟加热过程,加热速度与功率成正比 + heating_rate = self._heating_power / 100.0 * 2.0 # 最大每秒升温2度 + self._current_temperature += heating_rate + else: # 目标温度低于当前温度,自然冷却 + self._status = "Cooling Down" + self._is_heating = False + self._heating_power = 0.0 + # 模拟自然冷却,每秒降低0.2度 + self._current_temperature -= 0.2 + + # 限制温度范围 + self._current_temperature = max(20.0, min(self._max_temperature, self._current_temperature)) + + time.sleep(1.0) # 每秒更新一次 + + def emergency_stop(self): + """紧急停止""" + self._status = "Emergency Stop" + self._stop_heating_control() + self._is_heating = False + self._heating_power = 0.0 + + def get_status_info(self) -> dict: + """获取完整状态信息""" + return { + "current_temperature": self._current_temperature, + "target_temperature": self._target_temperature, + "status": self._status, + "power_on": self._power_on, + "is_heating": self._is_heating, + "heating_power": self._heating_power, + "max_temperature": self._max_temperature, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + heater = MockHeater() + + # 测试基本功能 + print("启动加热器测试...") + heater.power_on_off("On") + print(f"初始状态: {heater.get_status_info()}") + + # 设置目标温度为80度 + heater.set_temperature(80.0) + + # 模拟运行15秒 + for i in range(15): + time.sleep(1) + print( + f"第{i+1}秒: 当前温度={heater.current_temperature:.1f}°C, 功率={heater.heating_power:.1f}%, " + f"状态={heater.status}" + ) + + heater.emergency_stop() + print("测试完成") diff --git a/unilabos/devices/mock/mock_pump.py b/unilabos/devices/mock/mock_pump.py new file mode 100644 index 0000000..68acd85 --- /dev/null +++ b/unilabos/devices/mock/mock_pump.py @@ -0,0 +1,414 @@ +import time +import threading + + +class MockPump: + """ + 模拟泵设备类 + + 这个类模拟了一个实验室泵设备的行为,包括流量控制、压力监测、 + 运行状态管理等功能。所有的控制参数都使用字符串类型以提供更好的 + 可读性和扩展性。 + """ + + def __init__(self, port: str = "MOCK"): + """ + 初始化MockPump实例 + + Args: + port (str): 设备端口,默认为"MOCK"表示模拟设备 + """ + self.port = port + + # 设备基本状态属性 + self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped + self._power_state: str = "Off" # 电源状态:On, Off + self._pump_state: str = "Stopped" # 泵运行状态:Running, Stopped, Paused + + # 流量相关属性 + self._flow_rate: float = 0.0 # 当前流速 (mL/min) + self._target_flow_rate: float = 0.0 # 目标流速 (mL/min) + self._max_flow_rate: float = 100.0 # 最大流速 (mL/min) + self._total_volume: float = 0.0 # 累计流量 (mL) + + # 压力相关属性 + self._pressure: float = 0.0 # 当前压力 (bar) + self._max_pressure: float = 10.0 # 最大压力 (bar) + + # 方向控制属性 + self._direction: str = "Forward" # 泵方向:Forward, Reverse + + # 运行控制线程 + self._pump_thread = None + self._running = False + self._thread_lock = threading.Lock() + + # ==================== 状态属性 ==================== + # 这些属性会被Uni-Lab系统自动识别并定时对外广播 + + @property + def status(self) -> str: + return self._status + + @property + def power_state(self) -> str: + return self._power_state + + @property + def pump_state(self) -> str: + return self._pump_state + + @property + def flow_rate(self) -> float: + return self._flow_rate + + @property + def target_flow_rate(self) -> float: + return self._target_flow_rate + + @property + def pressure(self) -> float: + return self._pressure + + @property + def total_volume(self) -> float: + return self._total_volume + + @property + def direction(self) -> str: + return self._direction + + @property + def max_flow_rate(self) -> float: + return self._max_flow_rate + + @property + def max_pressure(self) -> float: + return self._max_pressure + + # ==================== 设备控制方法 ==================== + # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 + + def power_control(self, power_state: str = "On") -> str: + """ + 电源控制方法 + + Args: + power_state (str): 电源状态,可选值:"On", "Off" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + return "Error" + + self._power_state = power_state + + if power_state == "On": + self._status = "Power On" + else: + self._status = "Power Off" + # 关机时停止所有运行 + self.stop_pump() + + return "Success" + + def set_flow_rate(self, flow_rate: float) -> str: + """ + 设置目标流速 + + Args: + flow_rate (float): 目标流速 (mL/min) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + try: + flow_rate = float(flow_rate) + except ValueError: + self._status = "Error: Invalid flow rate" + return "Error" + + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if flow_rate < 0 or flow_rate > self._max_flow_rate: + self._status = f"Error: Flow rate out of range (0-{self._max_flow_rate})" + return "Error" + + self._target_flow_rate = flow_rate + self._status = "Setting Flow Rate" + + # 如果设置了非零流速,自动启动泵 + if flow_rate > 0: + # 自动切换泵状态为 "Running" 以触发操作循环 + self._pump_state = "Running" + self._start_pump_operation() + else: + self.stop_pump() + + return "Success" + + def start_pump(self) -> str: + """ + 启动泵运行 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if self._target_flow_rate <= 0: + self._status = "Error: No target flow rate set" + return "Error" + + self._pump_state = "Running" + self._status = "Starting Pump" + self._start_pump_operation() + + return "Success" + + def stop_pump(self) -> str: + """ + 停止泵运行 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._pump_state = "Stopped" + self._status = "Stopping Pump" + self._stop_pump_operation() + self._flow_rate = 0.0 + self._pressure = 0.0 + + return "Success" + + def pause_pump(self) -> str: + """ + 暂停泵运行 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._pump_state != "Running": + self._status = "Error: Pump not running" + return "Error" + + self._pump_state = "Paused" + self._status = "Pump Paused" + self._stop_pump_operation() + + return "Success" + + def resume_pump(self) -> str: + """ + 恢复泵运行 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._pump_state != "Paused": + self._status = "Error: Pump not paused" + return "Error" + + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self._pump_state = "Running" + self._status = "Resuming Pump" + self._start_pump_operation() + + return "Success" + + def set_direction(self, direction: str = "Forward") -> str: + """ + 设置泵方向 + + Args: + direction (str): 泵方向,可选值:"Forward", "Reverse" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if direction not in ["Forward", "Reverse"]: + self._status = "Error: Invalid direction" + return "Error" + + # 如果泵正在运行,需要先停止 + was_running = self._pump_state == "Running" + if was_running: + self.stop_pump() + time.sleep(0.5) # 等待停止完成 + + self._direction = direction + self._status = f"Direction set to {direction}" + + # 如果之前在运行,重新启动 + if was_running: + self.start_pump() + + return "Success" + + def reset_volume_counter(self) -> str: + """ + 重置累计流量计数器 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._total_volume = 0.0 + self._status = "Volume counter reset" + return "Success" + + def emergency_stop(self) -> str: + """ + 紧急停止 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._status = "Emergency Stop" + self._pump_state = "Stopped" + self._stop_pump_operation() + self._flow_rate = 0.0 + self._pressure = 0.0 + self._target_flow_rate = 0.0 + + return "Success" + + # ==================== 内部控制方法 ==================== + + def _start_pump_operation(self): + """ + 启动泵运行线程 + + 这个方法启动一个后台线程来模拟泵的实际运行过程, + 包括流速控制、压力变化和累计流量计算。 + """ + with self._thread_lock: + if not self._running and self._power_state == "On": + self._running = True + self._pump_thread = threading.Thread(target=self._pump_operation_loop) + self._pump_thread.daemon = True + self._pump_thread.start() + + def _stop_pump_operation(self): + """ + 停止泵运行线程 + + 安全地停止后台运行线程并等待其完成。 + """ + with self._thread_lock: + self._running = False + if self._pump_thread and self._pump_thread.is_alive(): + self._pump_thread.join(timeout=2.0) + + def _pump_operation_loop(self): + """ + 泵运行主循环 + + 这个方法在后台线程中运行,模拟真实泵的工作过程: + 1. 逐步调整流速到目标值 + 2. 根据流速计算压力 + 3. 累计流量统计 + 4. 状态更新 + """ + while self._running and self._power_state == "On" and self._pump_state == "Running": + try: + # 模拟流速调节过程(逐步接近目标流速) + flow_diff = self._target_flow_rate - self._flow_rate + + if abs(flow_diff) < 0.1: # 流速接近目标值 + self._flow_rate = self._target_flow_rate + self._status = "At Target Flow Rate" + else: + # 模拟流速调节,每秒调整10%的差值 + adjustment = flow_diff * 0.1 + self._flow_rate += adjustment + self._status = "Adjusting Flow Rate" + + # 确保流速在合理范围内 + self._flow_rate = max(0.0, min(self._max_flow_rate, self._flow_rate)) + + # 模拟压力变化(压力与流速成正比,加上一些随机波动) + base_pressure = (self._flow_rate / self._max_flow_rate) * self._max_pressure + pressure_variation = 0.1 * base_pressure * (time.time() % 1.0 - 0.5) # ±5%波动 + self._pressure = max(0.0, base_pressure + pressure_variation) + + # 累计流量计算(每秒更新) + if self._flow_rate > 0: + volume_increment = self._flow_rate / 60.0 # 转换为mL/s + if self._direction == "Reverse": + volume_increment = -volume_increment + self._total_volume += volume_increment + + # 压力保护检查 + if self._pressure > self._max_pressure * 0.95: + self._status = "Warning: High Pressure" + + # 等待1秒后继续下一次循环 + time.sleep(1.0) + + except Exception as e: + self._status = f"Error in pump operation: {str(e)}" + break + + # 循环结束时的清理工作 + if self._pump_state == "Running": + self._status = "Idle" + + def get_status_info(self) -> dict: + """ + 获取完整的设备状态信息 + + Returns: + dict: 包含所有设备状态的字典 + """ + return { + "status": self._status, + "power_state": self._power_state, + "pump_state": self._pump_state, + "flow_rate": self._flow_rate, + "target_flow_rate": self._target_flow_rate, + "pressure": self._pressure, + "total_volume": self._total_volume, + "direction": self._direction, + "max_flow_rate": self._max_flow_rate, + "max_pressure": self._max_pressure, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + pump = MockPump() + + # 测试基本功能 + print("启动泵设备测试...") + pump.power_control("On") + print(f"初始状态: {pump.get_status_info()}") + + # 设置流速并启动 + pump.set_flow_rate(50.0) + pump.start_pump() + + # 模拟运行10秒 + for i in range(10): + time.sleep(1) + print(f"第{i+1}秒: 流速={pump.flow_rate:.1f}mL/min, 压力={pump.pressure:.2f}bar, 状态={pump.status}") + + # 测试方向切换 + print("切换泵方向...") + pump.set_direction("Reverse") + + # 继续运行5秒 + for i in range(5): + time.sleep(1) + print(f"反向第{i+1}秒: 累计流量={pump.total_volume:.1f}mL, 方向={pump.direction}") + + pump.emergency_stop() + print("测试完成") diff --git a/unilabos/devices/mock/mock_rotavap.py b/unilabos/devices/mock/mock_rotavap.py new file mode 100644 index 0000000..bd2f2f8 --- /dev/null +++ b/unilabos/devices/mock/mock_rotavap.py @@ -0,0 +1,443 @@ +import time +import threading +import json + + +class MockRotavap: + """ + 模拟旋转蒸发器设备类 + + 这个类模拟了一个实验室旋转蒸发器的行为,包括旋转控制、 + 真空泵控制、温度控制等功能。参考了现有的 RotavapOne 实现。 + """ + + def __init__(self, port: str = "MOCK"): + """ + 初始化MockRotavap实例 + + Args: + port (str): 设备端口,默认为"MOCK"表示模拟设备 + """ + self.port = port + + # 设备基本状态属性 + self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped + self._power_state: str = "Off" # 电源状态:On, Off + + # 旋转相关属性 + self._rotate_state: str = "Stopped" # 旋转状态:Running, Stopped + self._rotate_time: float = 0.0 # 旋转剩余时间 (秒) + self._rotate_speed: float = 0.0 # 旋转速度 (rpm) + self._max_rotate_speed: float = 300.0 # 最大旋转速度 (rpm) + + # 真空泵相关属性 + self._pump_state: str = "Stopped" # 泵状态:Running, Stopped + self._pump_time: float = 0.0 # 泵剩余时间 (秒) + self._vacuum_level: float = 0.0 # 真空度 (mbar) + self._target_vacuum: float = 50.0 # 目标真空度 (mbar) + + # 温度相关属性 + self._temperature: float = 25.0 # 水浴温度 (°C) + self._target_temperature: float = 25.0 # 目标温度 (°C) + self._max_temperature: float = 180.0 # 最大温度 (°C) + + # 运行控制线程 + self._operation_thread = None + self._running = False + self._thread_lock = threading.Lock() + + # 操作成功标志 + self.success: str = "True" # 使用字符串而不是布尔值 + + # ==================== 状态属性 ==================== + # 这些属性会被Uni-Lab系统自动识别并定时对外广播 + + @property + def status(self) -> str: + return self._status + + @property + def power_state(self) -> str: + return self._power_state + + @property + def rotate_state(self) -> str: + return self._rotate_state + + @property + def rotate_time(self) -> float: + return self._rotate_time + + @property + def rotate_speed(self) -> float: + return self._rotate_speed + + @property + def pump_state(self) -> str: + return self._pump_state + + @property + def pump_time(self) -> float: + return self._pump_time + + @property + def vacuum_level(self) -> float: + return self._vacuum_level + + @property + def temperature(self) -> float: + return self._temperature + + @property + def target_temperature(self) -> float: + return self._target_temperature + + # ==================== 设备控制方法 ==================== + # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 + + def power_control(self, power_state: str = "On") -> str: + """ + 电源控制方法 + + Args: + power_state (str): 电源状态,可选值:"On", "Off" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + self.success = "False" + return "Error" + + self._power_state = power_state + + if power_state == "On": + self._status = "Power On" + self._start_operation() + else: + self._status = "Power Off" + self.stop_all_operations() + + self.success = "True" + return "Success" + + def set_timer(self, command: str) -> str: + """ + 设置定时器 - 兼容现有RotavapOne接口 + + Args: + command (str): JSON格式的命令字符串,包含rotate_time和pump_time + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + self.success = "False" + return "Error" + + try: + timer = json.loads(command) + rotate_time = timer.get("rotate_time", 0) + pump_time = timer.get("pump_time", 0) + + self.success = "False" + self._rotate_time = float(rotate_time) + self._pump_time = float(pump_time) + self.success = "True" + + self._status = "Timer Set" + return "Success" + + except (json.JSONDecodeError, ValueError, KeyError) as e: + self._status = f"Error: Invalid command format - {str(e)}" + self.success = "False" + return "Error" + + def set_rotate_time(self, time_seconds: float) -> str: + """ + 设置旋转时间 + + Args: + time_seconds (float): 旋转时间 (秒) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self.success = "False" + self._rotate_time = max(0.0, float(time_seconds)) + self.success = "True" + self._status = "Rotate time set" + return "Success" + + def set_pump_time(self, time_seconds: float) -> str: + """ + 设置泵时间 + + Args: + time_seconds (float): 泵时间 (秒) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self.success = "False" + self._pump_time = max(0.0, float(time_seconds)) + self.success = "True" + self._status = "Pump time set" + return "Success" + + def set_rotate_speed(self, speed: float) -> str: + """ + 设置旋转速度 + + Args: + speed (float): 旋转速度 (rpm) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if speed < 0 or speed > self._max_rotate_speed: + self._status = f"Error: Speed out of range (0-{self._max_rotate_speed})" + return "Error" + + self._rotate_speed = speed + self._status = "Rotate speed set" + return "Success" + + def set_temperature(self, temperature: float) -> str: + """ + 设置水浴温度 + + Args: + temperature (float): 目标温度 (°C) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if temperature < 0 or temperature > self._max_temperature: + self._status = f"Error: Temperature out of range (0-{self._max_temperature})" + return "Error" + + self._target_temperature = temperature + self._status = "Temperature set" + return "Success" + + def start_rotation(self) -> str: + """ + 启动旋转 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if self._rotate_time <= 0: + self._status = "Error: No rotate time set" + return "Error" + + self._rotate_state = "Running" + self._status = "Rotation started" + return "Success" + + def start_pump(self) -> str: + """ + 启动真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if self._pump_time <= 0: + self._status = "Error: No pump time set" + return "Error" + + self._pump_state = "Running" + self._status = "Pump started" + return "Success" + + def stop_all_operations(self) -> str: + """ + 停止所有操作 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._rotate_state = "Stopped" + self._pump_state = "Stopped" + self._stop_operation() + self._rotate_time = 0.0 + self._pump_time = 0.0 + self._vacuum_level = 0.0 + self._status = "All operations stopped" + return "Success" + + def emergency_stop(self) -> str: + """ + 紧急停止 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._status = "Emergency Stop" + self.stop_all_operations() + return "Success" + + # ==================== 内部控制方法 ==================== + + def _start_operation(self): + """ + 启动操作线程 + + 这个方法启动一个后台线程来模拟旋蒸的实际运行过程。 + """ + with self._thread_lock: + if not self._running and self._power_state == "On": + self._running = True + self._operation_thread = threading.Thread(target=self._operation_loop) + self._operation_thread.daemon = True + self._operation_thread.start() + + def _stop_operation(self): + """ + 停止操作线程 + + 安全地停止后台运行线程并等待其完成。 + """ + with self._thread_lock: + self._running = False + if self._operation_thread and self._operation_thread.is_alive(): + self._operation_thread.join(timeout=2.0) + + def _operation_loop(self): + """ + 操作主循环 + + 这个方法在后台线程中运行,模拟真实旋蒸的工作过程: + 1. 时间倒计时 + 2. 温度控制 + 3. 真空度控制 + 4. 状态更新 + """ + while self._running and self._power_state == "On": + try: + # 处理旋转时间倒计时 + if self._rotate_time > 0: + self._rotate_state = "Running" + self._rotate_time = max(0.0, self._rotate_time - 1.0) + else: + self._rotate_state = "Stopped" + + # 处理泵时间倒计时 + if self._pump_time > 0: + self._pump_state = "Running" + self._pump_time = max(0.0, self._pump_time - 1.0) + # 模拟真空度变化 + if self._vacuum_level > self._target_vacuum: + self._vacuum_level = max(self._target_vacuum, self._vacuum_level - 5.0) + else: + self._pump_state = "Stopped" + # 真空度逐渐回升 + self._vacuum_level = min(1013.25, self._vacuum_level + 2.0) + + # 模拟温度控制 + temp_diff = self._target_temperature - self._temperature + if abs(temp_diff) > 0.5: + if temp_diff > 0: + self._temperature += min(1.0, temp_diff * 0.1) + else: + self._temperature += max(-1.0, temp_diff * 0.1) + + # 更新整体状态 + if self._rotate_state == "Running" or self._pump_state == "Running": + self._status = "Operating" + elif self._rotate_time > 0 or self._pump_time > 0: + self._status = "Ready" + else: + self._status = "Idle" + + # 等待1秒后继续下一次循环 + time.sleep(1.0) + + except Exception as e: + self._status = f"Error in operation: {str(e)}" + break + + # 循环结束时的清理工作 + if self._power_state == "On": + self._status = "Idle" + + def get_status_info(self) -> dict: + """ + 获取完整的设备状态信息 + + Returns: + dict: 包含所有设备状态的字典 + """ + return { + "status": self._status, + "power_state": self._power_state, + "rotate_state": self._rotate_state, + "rotate_time": self._rotate_time, + "rotate_speed": self._rotate_speed, + "pump_state": self._pump_state, + "pump_time": self._pump_time, + "vacuum_level": self._vacuum_level, + "temperature": self._temperature, + "target_temperature": self._target_temperature, + "success": self.success, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + rotavap = MockRotavap() + + # 测试基本功能 + print("启动旋转蒸发器测试...") + rotavap.power_control("On") + print(f"初始状态: {rotavap.get_status_info()}") + + # 设置定时器 + timer_command = '{"rotate_time": 300, "pump_time": 600}' + rotavap.set_timer(timer_command) + + # 设置温度和转速 + rotavap.set_temperature(60.0) + rotavap.set_rotate_speed(120.0) + + # 启动操作 + rotavap.start_rotation() + rotavap.start_pump() + + # 模拟运行10秒 + for i in range(10): + time.sleep(1) + print( + f"第{i+1}秒: 旋转={rotavap.rotate_time:.0f}s, 泵={rotavap.pump_time:.0f}s, " + f"温度={rotavap.temperature:.1f}°C, 真空={rotavap.vacuum_level:.1f}mbar" + ) + + rotavap.emergency_stop() + print("测试完成") diff --git a/unilabos/devices/mock/mock_separator.py b/unilabos/devices/mock/mock_separator.py new file mode 100644 index 0000000..fdda4ba --- /dev/null +++ b/unilabos/devices/mock/mock_separator.py @@ -0,0 +1,184 @@ +import time +import threading + + +class MockSeparator: + def __init__(self, port: str = "MOCK"): + self.port = port + + # 基本状态属性 + self._power_state: str = "Off" # 电源:On 或 Off + self._status: str = "Idle" # 当前总体状态 + self._valve_state: str = "Closed" # 阀门状态:Open 或 Closed + self._settling_time: float = 0.0 # 静置时间(秒) + + # 搅拌相关属性 + self._shake_time: float = 0.0 # 剩余摇摆时间(秒) + self._shake_status: str = "Not Shaking" # 摇摆状态 + + # 用于后台模拟 shake 动作 + self._operation_thread = None + self._thread_lock = threading.Lock() + + @property + def power_state(self) -> str: + return self._power_state + + @property + def valve_state(self) -> str: + return self._valve_state + + @property + def settling_time(self) -> float: + return self._settling_time + + @property + def status(self) -> str: + return self._status + + @property + def shake_time(self) -> float: + with self._thread_lock: + return self._shake_time + + @property + def shake_status(self) -> str: + with self._thread_lock: + return self._shake_status + + def power_control(self, power_state: str) -> str: + """ + 电源控制:只接受 "On" 或 "Off" + """ + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + return "Error" + + self._power_state = power_state + if power_state == "On": + self._status = "Powered On" + else: + self._status = "Powered Off" + self.stop_operations() + return "Success" + + def shake(self, shake_time: float) -> str: + """ + 模拟 shake(搅拌)操作: + - 进入 "Shaking" 状态,倒计时 shake_time 秒 + - shake 结束后,进入 "Settling" 状态,静置时间固定为 5 秒 + - 最后恢复为 Idle + """ + try: + shake_time = float(shake_time) + except ValueError: + self._status = "Error: Invalid shake time" + return "Error" + + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + with self._thread_lock: + self._status = "Shaking" + self._settling_time = 0.0 + self._shake_time = shake_time + self._shake_status = "Shaking" + + def _run_shake(): + remaining = shake_time + while remaining > 0: + time.sleep(1) + remaining -= 1 + with self._thread_lock: + self._shake_time = remaining + with self._thread_lock: + self._status = "Settling" + self._settling_time = 60.0 # 固定静置时间为60秒 + self._shake_status = "Settling" + while True: + with self._thread_lock: + if self._settling_time <= 0: + self._status = "Idle" + self._shake_status = "Idle" + break + time.sleep(1) + with self._thread_lock: + self._settling_time = max(0.0, self._settling_time - 1) + + self._operation_thread = threading.Thread(target=_run_shake) + self._operation_thread.daemon = True + self._operation_thread.start() + return "Success" + + def set_valve(self, command: str) -> str: + """ + 阀门控制命令:传入 "open" 或 "close" + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + command = command.lower() + if command == "open": + self._valve_state = "Open" + self._status = "Valve Opened" + elif command == "close": + self._valve_state = "Closed" + self._status = "Valve Closed" + else: + self._status = "Error: Invalid valve command" + return "Error" + return "Success" + + def stop_operations(self) -> str: + """ + 停止任何正在执行的操作 + """ + with self._thread_lock: + self._settling_time = 0.0 + self._status = "Idle" + self._shake_status = "Idle" + self._shake_time = 0.0 + return "Success" + + def get_status_info(self) -> dict: + """ + 获取当前设备状态信息 + """ + with self._thread_lock: + return { + "status": self._status, + "power_state": self._power_state, + "valve_state": self._valve_state, + "settling_time": self._settling_time, + "shake_time": self._shake_time, + "shake_status": self._shake_status, + } + + +# 主函数用于测试 +if __name__ == "__main__": + separator = MockSeparator() + + print("启动简单版分离器测试...") + print(separator.power_control("On")) + print("初始状态:", separator.get_status_info()) + + # 触发 shake 操作,模拟 10 秒的搅拌 + print("执行 shake 操作...") + print(separator.shake(10.0)) + + # 循环显示状态变化 + for i in range(20): + time.sleep(1) + info = separator.get_status_info() + print( + f"第{i+1}秒: 状态={info['status']}, 静置时间={info['settling_time']:.1f}秒, " + f"阀门状态={info['valve_state']}, shake_time={info['shake_time']:.1f}, " + f"shake_status={info['shake_status']}" + ) + + # 模拟打开阀门 + print("打开阀门...", separator.set_valve("open")) + print("最终状态:", separator.get_status_info()) diff --git a/unilabos/devices/mock/mock_solenoid_valve.py b/unilabos/devices/mock/mock_solenoid_valve.py new file mode 100644 index 0000000..0f0fbe5 --- /dev/null +++ b/unilabos/devices/mock/mock_solenoid_valve.py @@ -0,0 +1,89 @@ +import time + + +class MockSolenoidValve: + """ + 模拟电磁阀设备类 - 简化版本 + + 这个类提供了电磁阀的基本功能:开启、关闭和状态查询 + """ + + def __init__(self, port: str = "MOCK"): + """ + 初始化MockSolenoidValve实例 + + Args: + port (str): 设备端口,默认为"MOCK"表示模拟设备 + """ + self.port = port + self._status: str = "Idle" + self._valve_status: str = "Closed" # 阀门位置:Open, Closed + + @property + def status(self) -> str: + """设备状态 - 会被自动识别的设备属性""" + return self._status + + @property + def valve_status(self) -> str: + """阀门状态""" + return self._valve_status + + def set_valve_status(self, status: str) -> str: + """ + 设置阀门位置 + + Args: + position (str): 阀门位置,可选值:"Open", "Closed" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if status not in ["Open", "Closed"]: + self._status = "Error: Invalid position" + return "Error" + + self._status = "Moving" + time.sleep(1) # 模拟阀门动作时间 + + self._valve_status = status + self._status = "Idle" + return "Success" + + def open_valve(self) -> str: + """打开阀门""" + return self.set_valve_status("Open") + + def close_valve(self) -> str: + """关闭阀门""" + return self.set_valve_status("Closed") + + def get_valve_status(self) -> str: + """获取阀门位置""" + return self._valve_status + + def is_open(self) -> bool: + """检查阀门是否打开""" + return self._valve_status == "Open" + + def is_closed(self) -> bool: + """检查阀门是否关闭""" + return self._valve_status == "Closed" + + +# 用于测试的主函数 +if __name__ == "__main__": + valve = MockSolenoidValve() + + print("启动电磁阀测试...") + print(f"初始状态: 位置={valve.valve_status}, 状态={valve.status}") + + # 测试开启阀门 + valve.open_valve() + print(f"开启后: 位置={valve.valve_status}, 状态={valve.status}") + + # 测试关闭阀门 + valve.close_valve() + print(f"关闭后: 位置={valve.valve_status}, 状态={valve.status}") + + print("测试完成") diff --git a/unilabos/devices/mock/mock_stirrer.py b/unilabos/devices/mock/mock_stirrer.py new file mode 100644 index 0000000..6907f7e --- /dev/null +++ b/unilabos/devices/mock/mock_stirrer.py @@ -0,0 +1,482 @@ +import time +import threading + + +class MockStirrer: + """ + 模拟搅拌器设备类 + + 这个类模拟了一个实验室搅拌器的行为,包括搅拌速度控制、 + 温度监测、加热控制等功能。参考了现有的 HeaterStirrer_DaLong 实现。 + """ + + def __init__(self, port: str = "MOCK"): + """ + 初始化MockStirrer实例 + + Args: + port (str): 设备端口,默认为"MOCK"表示模拟设备 + """ + self.port = port + + # 设备基本状态属性 + self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped + self._power_state: str = "Off" # 电源状态:On, Off + + # 搅拌相关属性 + self._stir_speed: float = 0.0 # 当前搅拌速度 (rpm) + self._target_stir_speed: float = 0.0 # 目标搅拌速度 (rpm) + self._max_stir_speed: float = 2000.0 # 最大搅拌速度 (rpm) + self._stir_state: str = "Stopped" # 搅拌状态:Running, Stopped + + # 温度相关属性 + self._temperature: float = 25.0 # 当前温度 (°C) + self._target_temperature: float = 25.0 # 目标温度 (°C) + self._max_temperature: float = 300.0 # 最大温度 (°C) + self._heating_state: str = "Off" # 加热状态:On, Off + self._heating_power: float = 0.0 # 加热功率百分比 0-100 + + # 运行控制线程 + self._operation_thread = None + self._running = False + self._thread_lock = threading.Lock() + + # ==================== 状态属性 ==================== + # 这些属性会被Uni-Lab系统自动识别并定时对外广播 + + @property + def status(self) -> str: + """ + 设备状态 - 会被自动识别的设备属性 + + Returns: + str: 当前设备状态 (Idle, Running, Error, Stopped) + """ + return self._status + + @property + def power_state(self) -> str: + """ + 电源状态 + + Returns: + str: 电源状态 (On, Off) + """ + return self._power_state + + @property + def stir_speed(self) -> float: + """ + 当前搅拌速度 + + Returns: + float: 当前搅拌速度 (rpm) + """ + return self._stir_speed + + @property + def target_stir_speed(self) -> float: + """ + 目标搅拌速度 + + Returns: + float: 目标搅拌速度 (rpm) + """ + return self._target_stir_speed + + @property + def stir_state(self) -> str: + """ + 搅拌状态 + + Returns: + str: 搅拌状态 (Running, Stopped) + """ + return self._stir_state + + @property + def temperature(self) -> float: + """ + 当前温度 + + Returns: + float: 当前温度 (°C) + """ + return self._temperature + + @property + def target_temperature(self) -> float: + """ + 目标温度 + + Returns: + float: 目标温度 (°C) + """ + return self._target_temperature + + @property + def heating_state(self) -> str: + """ + 加热状态 + + Returns: + str: 加热状态 (On, Off) + """ + return self._heating_state + + @property + def heating_power(self) -> float: + """ + 加热功率 + + Returns: + float: 加热功率百分比 (0-100) + """ + return self._heating_power + + @property + def max_stir_speed(self) -> float: + """ + 最大搅拌速度 + + Returns: + float: 最大搅拌速度 (rpm) + """ + return self._max_stir_speed + + @property + def max_temperature(self) -> float: + """ + 最大温度 + + Returns: + float: 最大温度 (°C) + """ + return self._max_temperature + + # ==================== 设备控制方法 ==================== + # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 + + def power_control(self, power_state: str = "On") -> str: + """ + 电源控制方法 + + Args: + power_state (str): 电源状态,可选值:"On", "Off" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + return "Error" + + self._power_state = power_state + + if power_state == "On": + self._status = "Power On" + self._start_operation() + else: + self._status = "Power Off" + self.stop_all_operations() + + return "Success" + + def set_stir_speed(self, speed: float) -> str: + """ + 设置搅拌速度 + + Args: + speed (float): 目标搅拌速度 (rpm) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + speed = float(speed) # 确保传入的速度是浮点数 + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if speed < 0 or speed > self._max_stir_speed: + self._status = f"Error: Speed out of range (0-{self._max_stir_speed})" + return "Error" + + self._target_stir_speed = speed + self._status = "Setting Stir Speed" + + # 如果设置了非零速度,启动搅拌 + if speed > 0: + self._stir_state = "Running" + else: + self._stir_state = "Stopped" + + return "Success" + + def set_temperature(self, temperature: float) -> str: + """ + 设置目标温度 + + Args: + temperature (float): 目标温度 (°C) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + temperature = float(temperature) # 确保传入的温度是浮点数 + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if temperature < 0 or temperature > self._max_temperature: + self._status = f"Error: Temperature out of range (0-{self._max_temperature})" + return "Error" + + self._target_temperature = temperature + self._status = "Setting Temperature" + + return "Success" + + def start_stirring(self) -> str: + """ + 启动搅拌 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if self._target_stir_speed <= 0: + self._status = "Error: No target speed set" + return "Error" + + self._stir_state = "Running" + self._status = "Stirring Started" + return "Success" + + def stop_stirring(self) -> str: + """ + 停止搅拌 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._stir_state = "Stopped" + self._target_stir_speed = 0.0 + self._status = "Stirring Stopped" + return "Success" + + def heating_control(self, heating_state: str = "On") -> str: + """ + 加热控制 + + Args: + heating_state (str): 加热状态,可选值:"On", "Off" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if heating_state not in ["On", "Off"]: + self._status = "Error: Invalid heating state" + return "Error" + + self._heating_state = heating_state + + if heating_state == "On": + self._status = "Heating On" + else: + self._status = "Heating Off" + self._heating_power = 0.0 + + return "Success" + + def stop_all_operations(self) -> str: + """ + 停止所有操作 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._stir_state = "Stopped" + self._heating_state = "Off" + self._stop_operation() + self._stir_speed = 0.0 + self._target_stir_speed = 0.0 + self._heating_power = 0.0 + self._status = "All operations stopped" + return "Success" + + def emergency_stop(self) -> str: + """ + 紧急停止 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._status = "Emergency Stop" + self.stop_all_operations() + return "Success" + + # ==================== 内部控制方法 ==================== + + def _start_operation(self): + """ + 启动操作线程 + + 这个方法启动一个后台线程来模拟搅拌器的实际运行过程。 + """ + with self._thread_lock: + if not self._running and self._power_state == "On": + self._running = True + self._operation_thread = threading.Thread(target=self._operation_loop) + self._operation_thread.daemon = True + self._operation_thread.start() + + def _stop_operation(self): + """ + 停止操作线程 + + 安全地停止后台运行线程并等待其完成。 + """ + with self._thread_lock: + self._running = False + if self._operation_thread and self._operation_thread.is_alive(): + self._operation_thread.join(timeout=2.0) + + def _operation_loop(self): + """ + 操作主循环 + + 这个方法在后台线程中运行,模拟真实搅拌器的工作过程: + 1. 搅拌速度控制 + 2. 温度控制和加热 + 3. 状态更新 + """ + while self._running and self._power_state == "On": + try: + # 处理搅拌速度控制 + if self._stir_state == "Running": + speed_diff = self._target_stir_speed - self._stir_speed + + if abs(speed_diff) < 1.0: # 速度接近目标值 + self._stir_speed = self._target_stir_speed + if self._stir_speed > 0: + self._status = "Stirring at Target Speed" + else: + # 模拟速度调节,每秒调整10%的差值 + adjustment = speed_diff * 0.1 + self._stir_speed += adjustment + self._status = "Adjusting Stir Speed" + + # 确保速度在合理范围内 + self._stir_speed = max(0.0, min(self._max_stir_speed, self._stir_speed)) + else: + # 搅拌停止时,速度逐渐降为0 + if self._stir_speed > 0: + self._stir_speed = max(0.0, self._stir_speed - 50.0) # 每秒减少50rpm + + # 处理温度控制 + if self._heating_state == "On": + temp_diff = self._target_temperature - self._temperature + + if abs(temp_diff) < 0.5: # 温度接近目标值 + self._heating_power = 20.0 # 维持温度的最小功率 + elif temp_diff > 0: # 需要加热 + # 根据温差调整加热功率 + if temp_diff > 50: + self._heating_power = 100.0 + elif temp_diff > 20: + self._heating_power = 80.0 + elif temp_diff > 10: + self._heating_power = 60.0 + else: + self._heating_power = 40.0 + + # 模拟加热过程 + heating_rate = self._heating_power / 100.0 * 1.5 # 最大每秒升温1.5度 + self._temperature += heating_rate + else: # 目标温度低于当前温度 + self._heating_power = 0.0 + # 自然冷却 + self._temperature -= 0.1 + else: + self._heating_power = 0.0 + # 自然冷却到室温 + if self._temperature > 25.0: + self._temperature -= 0.2 + + # 限制温度范围 + self._temperature = max(20.0, min(self._max_temperature, self._temperature)) + + # 更新整体状态 + if self._stir_state == "Running" and self._heating_state == "On": + self._status = "Stirring and Heating" + elif self._stir_state == "Running": + self._status = "Stirring Only" + elif self._heating_state == "On": + self._status = "Heating Only" + else: + self._status = "Idle" + + # 等待1秒后继续下一次循环 + time.sleep(1.0) + + except Exception as e: + self._status = f"Error in operation: {str(e)}" + break + + # 循环结束时的清理工作 + if self._power_state == "On": + self._status = "Idle" + + def get_status_info(self) -> dict: + """ + 获取完整的设备状态信息 + + Returns: + dict: 包含所有设备状态的字典 + """ + return { + "status": self._status, + "power_state": self._power_state, + "stir_speed": self._stir_speed, + "target_stir_speed": self._target_stir_speed, + "stir_state": self._stir_state, + "temperature": self._temperature, + "target_temperature": self._target_temperature, + "heating_state": self._heating_state, + "heating_power": self._heating_power, + "max_stir_speed": self._max_stir_speed, + "max_temperature": self._max_temperature, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + stirrer = MockStirrer() + + # 测试基本功能 + print("启动搅拌器测试...") + stirrer.power_control("On") + print(f"初始状态: {stirrer.get_status_info()}") + + # 设置搅拌速度和温度 + stirrer.set_stir_speed(800.0) + stirrer.set_temperature(60.0) + stirrer.heating_control("On") + + # 模拟运行15秒 + for i in range(15): + time.sleep(1) + print( + f"第{i+1}秒: 速度={stirrer.stir_speed:.0f}rpm, 温度={stirrer.temperature:.1f}°C, " + f"功率={stirrer.heating_power:.1f}%, 状态={stirrer.status}" + ) + + stirrer.emergency_stop() + print("测试完成") diff --git a/unilabos/devices/mock/mock_vacuum.py b/unilabos/devices/mock/mock_vacuum.py new file mode 100644 index 0000000..9e368a9 --- /dev/null +++ b/unilabos/devices/mock/mock_vacuum.py @@ -0,0 +1,410 @@ +import time +import threading + + +class MockVacuum: + """ + 模拟真空泵设备类 + + 这个类模拟了一个实验室真空泵的行为,包括真空度控制、 + 压力监测、运行状态管理等功能。参考了现有的 VacuumPumpMock 实现。 + """ + + def __init__(self, port: str = "MOCK"): + """ + 初始化MockVacuum实例 + + Args: + port (str): 设备端口,默认为"MOCK"表示模拟设备 + """ + self.port = port + + # 设备基本状态属性 + self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped + self._power_state: str = "Off" # 电源状态:On, Off + self._pump_state: str = "Stopped" # 泵运行状态:Running, Stopped, Paused + + # 真空相关属性 + self._vacuum_level: float = 1013.25 # 当前真空度 (mbar) - 大气压开始 + self._target_vacuum: float = 50.0 # 目标真空度 (mbar) + self._min_vacuum: float = 1.0 # 最小真空度 (mbar) + self._max_vacuum: float = 1013.25 # 最大真空度 (mbar) - 大气压 + + # 泵性能相关属性 + self._pump_speed: float = 0.0 # 泵速 (L/s) + self._max_pump_speed: float = 100.0 # 最大泵速 (L/s) + self._pump_efficiency: float = 95.0 # 泵效率百分比 + + # 运行控制线程 + self._vacuum_thread = None + self._running = False + self._thread_lock = threading.Lock() + + # ==================== 状态属性 ==================== + # 这些属性会被Uni-Lab系统自动识别并定时对外广播 + + @property + def status(self) -> str: + """ + 设备状态 - 会被自动识别的设备属性 + + Returns: + str: 当前设备状态 (Idle, Running, Error, Stopped) + """ + return self._status + + @property + def power_state(self) -> str: + """ + 电源状态 + + Returns: + str: 电源状态 (On, Off) + """ + return self._power_state + + @property + def pump_state(self) -> str: + """ + 泵运行状态 + + Returns: + str: 泵状态 (Running, Stopped, Paused) + """ + return self._pump_state + + @property + def vacuum_level(self) -> float: + """ + 当前真空度 + + Returns: + float: 当前真空度 (mbar) + """ + return self._vacuum_level + + @property + def target_vacuum(self) -> float: + """ + 目标真空度 + + Returns: + float: 目标真空度 (mbar) + """ + return self._target_vacuum + + @property + def pump_speed(self) -> float: + """ + 泵速 + + Returns: + float: 泵速 (L/s) + """ + return self._pump_speed + + @property + def pump_efficiency(self) -> float: + """ + 泵效率 + + Returns: + float: 泵效率百分比 + """ + return self._pump_efficiency + + @property + def max_pump_speed(self) -> float: + """ + 最大泵速 + + Returns: + float: 最大泵速 (L/s) + """ + return self._max_pump_speed + + # ==================== 设备控制方法 ==================== + # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 + + def power_control(self, power_state: str = "On") -> str: + """ + 电源控制方法 + + Args: + power_state (str): 电源状态,可选值:"On", "Off" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + return "Error" + + self._power_state = power_state + + if power_state == "On": + self._status = "Power On" + self._start_vacuum_operation() + else: + self._status = "Power Off" + self.stop_vacuum() + + return "Success" + + def set_vacuum_level(self, vacuum_level: float) -> str: + """ + 设置目标真空度 + + Args: + vacuum_level (float): 目标真空度 (mbar) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + try: + vacuum_level = float(vacuum_level) + except ValueError: + self._status = "Error: Invalid vacuum level" + return "Error" + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if vacuum_level < self._min_vacuum or vacuum_level > self._max_vacuum: + self._status = f"Error: Vacuum level out of range ({self._min_vacuum}-{self._max_vacuum})" + return "Error" + + self._target_vacuum = vacuum_level + self._status = "Setting Vacuum Level" + + return "Success" + + def start_vacuum(self) -> str: + """ + 启动真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self._pump_state = "Running" + self._status = "Starting Vacuum Pump" + self._start_vacuum_operation() + + return "Success" + + def stop_vacuum(self) -> str: + """ + 停止真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._pump_state = "Stopped" + self._status = "Stopping Vacuum Pump" + self._stop_vacuum_operation() + self._pump_speed = 0.0 + + return "Success" + + def pause_vacuum(self) -> str: + """ + 暂停真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._pump_state != "Running": + self._status = "Error: Pump not running" + return "Error" + + self._pump_state = "Paused" + self._status = "Vacuum Pump Paused" + self._stop_vacuum_operation() + + return "Success" + + def resume_vacuum(self) -> str: + """ + 恢复真空泵运行 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._pump_state != "Paused": + self._status = "Error: Pump not paused" + return "Error" + + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self._pump_state = "Running" + self._status = "Resuming Vacuum Pump" + self._start_vacuum_operation() + + return "Success" + + def vent_to_atmosphere(self) -> str: + """ + 通大气 - 将真空度恢复到大气压 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._target_vacuum = self._max_vacuum # 设置为大气压 + self._status = "Venting to Atmosphere" + return "Success" + + def emergency_stop(self) -> str: + """ + 紧急停止 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._status = "Emergency Stop" + self._pump_state = "Stopped" + self._stop_vacuum_operation() + self._pump_speed = 0.0 + + return "Success" + + # ==================== 内部控制方法 ==================== + + def _start_vacuum_operation(self): + """ + 启动真空操作线程 + + 这个方法启动一个后台线程来模拟真空泵的实际运行过程。 + """ + with self._thread_lock: + if not self._running and self._power_state == "On": + self._running = True + self._vacuum_thread = threading.Thread(target=self._vacuum_operation_loop) + self._vacuum_thread.daemon = True + self._vacuum_thread.start() + + def _stop_vacuum_operation(self): + """ + 停止真空操作线程 + + 安全地停止后台运行线程并等待其完成。 + """ + with self._thread_lock: + self._running = False + if self._vacuum_thread and self._vacuum_thread.is_alive(): + self._vacuum_thread.join(timeout=2.0) + + def _vacuum_operation_loop(self): + """ + 真空操作主循环 + + 这个方法在后台线程中运行,模拟真空泵的工作过程: + 1. 检查电源状态和运行状态 + 2. 如果泵状态为 "Running",根据目标真空调整泵速和真空度 + 3. 否则等待 + """ + while self._running and self._power_state == "On": + try: + with self._thread_lock: + # 只有泵状态为 Running 时才进行更新 + if self._pump_state == "Running": + vacuum_diff = self._vacuum_level - self._target_vacuum + + if abs(vacuum_diff) < 1.0: # 真空度接近目标值 + self._status = "At Target Vacuum" + self._pump_speed = self._max_pump_speed * 0.2 # 维持真空的最小泵速 + elif vacuum_diff > 0: # 需要抽真空(降低压力) + self._status = "Pumping Down" + if vacuum_diff > 500: + self._pump_speed = self._max_pump_speed + elif vacuum_diff > 100: + self._pump_speed = self._max_pump_speed * 0.8 + elif vacuum_diff > 50: + self._pump_speed = self._max_pump_speed * 0.6 + else: + self._pump_speed = self._max_pump_speed * 0.4 + + # 根据泵速和效率计算真空降幅 + pump_rate = (self._pump_speed / self._max_pump_speed) * self._pump_efficiency / 100.0 + vacuum_reduction = pump_rate * 10.0 # 每秒最大降低10 mbar + self._vacuum_level = max(self._target_vacuum, self._vacuum_level - vacuum_reduction) + else: # 目标真空度高于当前值,需要通气 + self._status = "Venting" + self._pump_speed = 0.0 + self._vacuum_level = min(self._target_vacuum, self._vacuum_level + 5.0) + + # 限制真空度范围 + self._vacuum_level = max(self._min_vacuum, min(self._max_vacuum, self._vacuum_level)) + else: + # 当泵状态不是 Running 时,可保持原状态 + self._status = "Vacuum Pump Not Running" + # 释放锁后等待1秒钟 + time.sleep(1.0) + except Exception as e: + with self._thread_lock: + self._status = f"Error in vacuum operation: {str(e)}" + break + + # 循环结束后的清理工作 + if self._pump_state == "Running": + self._status = "Idle" + # 停止泵后,真空度逐渐回升到大气压 + while self._vacuum_level < self._max_vacuum * 0.9: + with self._thread_lock: + self._vacuum_level += 2.0 + time.sleep(0.1) + + def get_status_info(self) -> dict: + """ + 获取完整的设备状态信息 + + Returns: + dict: 包含所有设备状态的字典 + """ + return { + "status": self._status, + "power_state": self._power_state, + "pump_state": self._pump_state, + "vacuum_level": self._vacuum_level, + "target_vacuum": self._target_vacuum, + "pump_speed": self._pump_speed, + "pump_efficiency": self._pump_efficiency, + "max_pump_speed": self._max_pump_speed, + } + + +# 用于测试的主函数 +if __name__ == "__main__": + vacuum = MockVacuum() + + # 测试基本功能 + print("启动真空泵测试...") + vacuum.power_control("On") + print(f"初始状态: {vacuum.get_status_info()}") + + # 设置目标真空度并启动 + vacuum.set_vacuum_level(10.0) # 设置为10mbar + vacuum.start_vacuum() + + # 模拟运行15秒 + for i in range(15): + time.sleep(1) + print( + f"第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 泵速={vacuum.pump_speed:.1f}L/s, 状态={vacuum.status}" + ) + # 测试通大气 + print("测试通大气...") + vacuum.vent_to_atmosphere() + + # 继续运行5秒观察通大气过程 + for i in range(5): + time.sleep(1) + print(f"通大气第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 状态={vacuum.status}") + + vacuum.emergency_stop() + print("测试完成")