diff --git a/unilabos/devices/Mock/__init__.py b/unilabos/devices/Mock/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/unilabos/devices/Mock/mock_chiller.py b/unilabos/devices/Mock/mock_chiller.py deleted file mode 100644 index 46bb7f0..0000000 --- a/unilabos/devices/Mock/mock_chiller.py +++ /dev/null @@ -1,170 +0,0 @@ -import time -import threading - - -class MockChiller: - def __init__(self, port: str = "MOCK"): - self.port = port - self._current_temperature: float = 25.0 # 室温开始 - self._target_temperature: float = 25.0 - self._status: str = "Idle" - self._is_cooling: bool = False - self._is_heating: bool = False - self._power_on: bool = False - - # 模拟温度变化的线程 - self._temperature_thread = None - self._running = False - - @property - def current_temperature(self) -> float: - """当前温度 - 会被自动识别的设备属性""" - return self._current_temperature - - @property - def target_temperature(self) -> float: - """目标温度""" - return self._target_temperature - - @property - def status(self) -> str: - """设备状态 - 会被自动识别的设备属性""" - return self._status - - @property - def power_on(self) -> bool: - """电源状态""" - return self._power_on - - @property - def is_cooling(self) -> bool: - """是否正在冷却""" - return self._is_cooling - - @property - def is_heating(self) -> bool: - """是否正在加热""" - return self._is_heating - - def set_temperature(self, temperature: float): - """设置目标温度 - 需要在注册表添加的设备动作""" - if not self._power_on: - self._status = "Error: Power Off" - return False - - # 将传入温度转换为 float,并限制在允许范围内 - temperature = float(temperature) - self._target_temperature = temperature - - # 立即更新状态 - diff = self._target_temperature - self._current_temperature - if abs(diff) < 0.1: - self._status = "At Target Temperature" - self._is_cooling = False - self._is_heating = False - elif diff < 0: - self._status = "Cooling" - self._is_cooling = True - self._is_heating = False - else: - self._status = "Heating" - self._is_heating = True - self._is_cooling = False - - # 启动温度控制 - self._start_temperature_control() - return True - - def power_on_off(self, power_state: str): - """开关机控制""" - if power_state == "on": - self._power_on = True - # 不在这里直接设置状态和加热/制冷标志 - self._start_temperature_control() - else: - self._power_on = False - self._status = "Power Off" - self._stop_temperature_control() - self._is_cooling = False - self._is_heating = False - - def _start_temperature_control(self): - """启动温度控制线程""" - if self._power_on: # 移除 not self._running 检查 - self._running = True - if self._temperature_thread is None or not self._temperature_thread.is_alive(): - self._temperature_thread = threading.Thread(target=self._temperature_control_loop) - self._temperature_thread.daemon = True - self._temperature_thread.start() - - def _stop_temperature_control(self): - """停止温度控制""" - self._running = False - if self._temperature_thread: - self._temperature_thread.join(timeout=1.0) - - def _temperature_control_loop(self): - """温度控制循环 - 模拟真实冷却器的温度变化""" - while self._running and self._power_on: - temp_diff = self._target_temperature - self._current_temperature - - if abs(temp_diff) < 0.1: # 将判断范围从0.5改小到0.1 - self._status = "At Target Temperature" - self._is_cooling = False - self._is_heating = False - elif temp_diff < 0: # 需要冷却 - self._status = "Cooling" - self._is_cooling = True - self._is_heating = False - # 模拟冷却过程,每秒降低0.5度 - self._current_temperature -= 0.5 - else: # 需要加热 - self._status = "Heating" - self._is_heating = True - self._is_cooling = False - # 模拟加热过程,每秒升高0.3度 - self._current_temperature += 0.3 - - # 限制温度范围 - self._current_temperature = max(-20.0, min(80.0, self._current_temperature)) - - time.sleep(1.0) # 每秒更新一次 - - def emergency_stop(self): - """紧急停止""" - self._status = "Emergency Stop" - self._stop_temperature_control() - self._is_cooling = False - self._is_heating = False - - def get_status_info(self) -> dict: - """获取完整状态信息""" - return { - "current_temperature": self._current_temperature, - "target_temperature": self._target_temperature, - "status": self._status, - "power_on": self._power_on, - "is_cooling": self._is_cooling, - "is_heating": self._is_heating, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - chiller = MockChiller() - - # 测试基本功能 - print("启动冷却器测试...") - chiller.power_on_off("on") - print(f"初始状态: {chiller.get_status_info()}") - - # 设置目标温度为5度 - chiller.set_temperature(5.0) - - # 模拟运行10秒 - for i in range(10): - time.sleep(1) - print(f"第{i+1}秒: 当前温度={chiller.current_temperature:.1f}°C, 状态={chiller.status}") - - chiller.emergency_stop() - print("测试完成") diff --git a/unilabos/devices/Mock/mock_filter.py b/unilabos/devices/Mock/mock_filter.py deleted file mode 100644 index 6581ac4..0000000 --- a/unilabos/devices/Mock/mock_filter.py +++ /dev/null @@ -1,170 +0,0 @@ -import time -import threading - - -class MockFilter: - def __init__(self, port: str = "MOCK"): - self.port = port - self._status: str = "Idle" - self._is_filtering: bool = False - self._filter_efficiency: float = 95.0 # 过滤效率百分比 - self._flow_rate: float = 0.0 # 流速 L/min - self._pressure_drop: float = 0.0 # 压降 Pa - self._filter_life: float = 100.0 # 滤芯寿命百分比 - self._power_on: bool = False - - # 模拟过滤过程的线程 - self._filter_thread = None - self._running = False - - @property - def status(self) -> str: - """设备状态 - 会被自动识别的设备属性""" - return self._status - - @property - def is_filtering(self) -> bool: - """是否正在过滤""" - return self._is_filtering - - @property - def filter_efficiency(self) -> float: - """过滤效率""" - return self._filter_efficiency - - @property - def flow_rate(self) -> float: - """流速""" - return self._flow_rate - - @property - def pressure_drop(self) -> float: - """压降""" - return self._pressure_drop - - @property - def filter_life(self) -> float: - """滤芯寿命""" - return self._filter_life - - @property - def power_on(self) -> bool: - """电源状态""" - return self._power_on - - def start_filtering(self, flow_rate: float = 1.0): - """开始过滤 - 需要在注册表添加的设备动作""" - if not self._power_on: - self._status = "Error: Power Off" - return False - - self._flow_rate = flow_rate - self._status = "Starting Filter" - self._start_filter_process() - return True - - def stop_filtering(self): - """停止过滤""" - self._status = "Stopping Filter" - self._stop_filter_process() - self._flow_rate = 0.0 - self._is_filtering = False - self._status = "Idle" - return True - - def power_on_off(self, power_state: str): - """开关机控制""" - if power_state == "on": - self._power_on = True - self._status = "Power On" - else: - self._power_on = False - self._status = "Power Off" - self._stop_filter_process() - self._is_filtering = False - self._flow_rate = 0.0 - - def replace_filter(self): - """更换滤芯""" - self._filter_life = 100.0 - self._filter_efficiency = 95.0 - self._status = "Filter Replaced" - return True - - def _start_filter_process(self): - """启动过滤过程线程""" - if not self._running and self._power_on: - self._running = True - self._is_filtering = True - self._filter_thread = threading.Thread(target=self._filter_loop) - self._filter_thread.daemon = True - self._filter_thread.start() - - def _stop_filter_process(self): - """停止过滤过程""" - self._running = False - if self._filter_thread: - self._filter_thread.join(timeout=1.0) - - def _filter_loop(self): - """过滤过程循环 - 模拟真实过滤器的工作过程""" - while self._running and self._power_on and self._is_filtering: - self._status = "Filtering" - - # 模拟滤芯磨损 - if self._filter_life > 0: - self._filter_life -= 0.1 # 每秒减少0.1%寿命 - - # 根据滤芯寿命调整效率和压降 - life_factor = self._filter_life / 100.0 - self._filter_efficiency = 95.0 * life_factor + 50.0 * (1 - life_factor) - self._pressure_drop = 100.0 + (200.0 * (1 - life_factor)) # 压降随磨损增加 - - # 检查滤芯是否需要更换 - if self._filter_life <= 10.0: - self._status = "Filter Needs Replacement" - - time.sleep(1.0) # 每秒更新一次 - - def emergency_stop(self): - """紧急停止""" - self._status = "Emergency Stop" - self._stop_filter_process() - self._is_filtering = False - self._flow_rate = 0.0 - - def get_status_info(self) -> dict: - """获取完整状态信息""" - return { - "status": self._status, - "is_filtering": self._is_filtering, - "filter_efficiency": self._filter_efficiency, - "flow_rate": self._flow_rate, - "pressure_drop": self._pressure_drop, - "filter_life": self._filter_life, - "power_on": self._power_on, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - filter_device = MockFilter() - - # 测试基本功能 - print("启动过滤器测试...") - filter_device.power_on_off("on") - print(f"初始状态: {filter_device.get_status_info()}") - - # 开始过滤 - filter_device.start_filtering(2.0) - - # 模拟运行10秒 - for i in range(10): - time.sleep(1) - print( - f"第{i+1}秒: 效率={filter_device.filter_efficiency:.1f}%, " - f"寿命={filter_device.filter_life:.1f}%, 状态={filter_device.status}" - ) - - filter_device.emergency_stop() - print("测试完成") diff --git a/unilabos/devices/Mock/mock_heater.py b/unilabos/devices/Mock/mock_heater.py deleted file mode 100644 index 6d9abff..0000000 --- a/unilabos/devices/Mock/mock_heater.py +++ /dev/null @@ -1,202 +0,0 @@ -import time -import threading - - -class MockHeater: - def __init__(self, port: str = "MOCK"): - self.port = port - self._current_temperature: float = 25.0 # 室温开始 - self._target_temperature: float = 25.0 - self._status: str = "Idle" - self._is_heating: bool = False - self._power_on: bool = False - self._heating_power: float = 0.0 # 加热功率百分比 0-100 - self._max_temperature: float = 300.0 # 最大加热温度 - - # 模拟加热过程的线程 - self._heating_thread = None - self._running = False - - @property - def current_temperature(self) -> float: - """当前温度 - 会被自动识别的设备属性""" - return self._current_temperature - - @property - def target_temperature(self) -> float: - """目标温度""" - return self._target_temperature - - @property - def status(self) -> str: - """设备状态 - 会被自动识别的设备属性""" - return self._status - - @property - def power_on(self) -> bool: - """电源状态""" - return self._power_on - - @property - def is_heating(self) -> bool: - """是否正在加热""" - return self._is_heating - - @property - def heating_power(self) -> float: - """加热功率百分比""" - return self._heating_power - - @property - def max_temperature(self) -> float: - """最大加热温度""" - return self._max_temperature - - def set_temperature(self, temperature: float): - """设置目标温度 - 需要在注册表添加的设备动作""" - try: - temperature = float(temperature) - except ValueError: - self._status = "Error: Invalid temperature value" - return False - - if not self._power_on: - self._status = "Error: Power Off" - return False - - if temperature > self._max_temperature: - self._status = f"Error: Temperature exceeds maximum ({self._max_temperature}°C)" - return False - - self._target_temperature = temperature - self._status = "Setting Temperature" - - # 启动加热控制 - self._start_heating_control() - return True - - def set_heating_power(self, power: float): - """设置加热功率""" - try: - power = float(power) - except ValueError: - self._status = "Error: Invalid power value" - return False - - if not self._power_on: - self._status = "Error: Power Off" - return False - - self._heating_power = max(0.0, min(100.0, power)) # 限制在0-100% - return True - - def power_on_off(self, power_state: str): - """开关机控制,接收字符串命令 "On" 或 "Off" """ - power_state = power_state.capitalize() - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - self._power_on = True if power_state == "On" else False - if self._power_on: - self._status = "Power On" - self._start_heating_control() - else: - self._status = "Power Off" - self._stop_heating_control() - self._is_heating = False - self._heating_power = 0.0 - return "Success" - - def _start_heating_control(self): - """启动加热控制线程""" - if not self._running and self._power_on: - self._running = True - self._heating_thread = threading.Thread(target=self._heating_control_loop) - self._heating_thread.daemon = True - self._heating_thread.start() - - def _stop_heating_control(self): - """停止加热控制""" - self._running = False - if self._heating_thread: - self._heating_thread.join(timeout=1.0) - - def _heating_control_loop(self): - """加热控制循环 - 模拟真实加热器的温度变化""" - while self._running and self._power_on: - temp_diff = self._target_temperature - self._current_temperature - - if abs(temp_diff) < 0.5: # 温度接近目标值 - self._status = "At Target Temperature" - self._is_heating = False - self._heating_power = 10.0 # 维持温度的最小功率 - elif temp_diff > 0: # 需要加热 - self._status = "Heating" - self._is_heating = True - # 根据温差调整加热功率 - if temp_diff > 50: - self._heating_power = 100.0 - elif temp_diff > 20: - self._heating_power = 80.0 - elif temp_diff > 10: - self._heating_power = 60.0 - else: - self._heating_power = 40.0 - - # 模拟加热过程,加热速度与功率成正比 - heating_rate = self._heating_power / 100.0 * 2.0 # 最大每秒升温2度 - self._current_temperature += heating_rate - else: # 目标温度低于当前温度,自然冷却 - self._status = "Cooling Down" - self._is_heating = False - self._heating_power = 0.0 - # 模拟自然冷却,每秒降低0.2度 - self._current_temperature -= 0.2 - - # 限制温度范围 - self._current_temperature = max(20.0, min(self._max_temperature, self._current_temperature)) - - time.sleep(1.0) # 每秒更新一次 - - def emergency_stop(self): - """紧急停止""" - self._status = "Emergency Stop" - self._stop_heating_control() - self._is_heating = False - self._heating_power = 0.0 - - def get_status_info(self) -> dict: - """获取完整状态信息""" - return { - "current_temperature": self._current_temperature, - "target_temperature": self._target_temperature, - "status": self._status, - "power_on": self._power_on, - "is_heating": self._is_heating, - "heating_power": self._heating_power, - "max_temperature": self._max_temperature, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - heater = MockHeater() - - # 测试基本功能 - print("启动加热器测试...") - heater.power_on_off("On") - print(f"初始状态: {heater.get_status_info()}") - - # 设置目标温度为80度 - heater.set_temperature(80.0) - - # 模拟运行15秒 - for i in range(15): - time.sleep(1) - print( - f"第{i+1}秒: 当前温度={heater.current_temperature:.1f}°C, 功率={heater.heating_power:.1f}%, " - f"状态={heater.status}" - ) - - heater.emergency_stop() - print("测试完成") diff --git a/unilabos/devices/Mock/mock_pump.py b/unilabos/devices/Mock/mock_pump.py deleted file mode 100644 index 68acd85..0000000 --- a/unilabos/devices/Mock/mock_pump.py +++ /dev/null @@ -1,414 +0,0 @@ -import time -import threading - - -class MockPump: - """ - 模拟泵设备类 - - 这个类模拟了一个实验室泵设备的行为,包括流量控制、压力监测、 - 运行状态管理等功能。所有的控制参数都使用字符串类型以提供更好的 - 可读性和扩展性。 - """ - - def __init__(self, port: str = "MOCK"): - """ - 初始化MockPump实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ - self.port = port - - # 设备基本状态属性 - self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off - self._pump_state: str = "Stopped" # 泵运行状态:Running, Stopped, Paused - - # 流量相关属性 - self._flow_rate: float = 0.0 # 当前流速 (mL/min) - self._target_flow_rate: float = 0.0 # 目标流速 (mL/min) - self._max_flow_rate: float = 100.0 # 最大流速 (mL/min) - self._total_volume: float = 0.0 # 累计流量 (mL) - - # 压力相关属性 - self._pressure: float = 0.0 # 当前压力 (bar) - self._max_pressure: float = 10.0 # 最大压力 (bar) - - # 方向控制属性 - self._direction: str = "Forward" # 泵方向:Forward, Reverse - - # 运行控制线程 - self._pump_thread = None - self._running = False - self._thread_lock = threading.Lock() - - # ==================== 状态属性 ==================== - # 这些属性会被Uni-Lab系统自动识别并定时对外广播 - - @property - def status(self) -> str: - return self._status - - @property - def power_state(self) -> str: - return self._power_state - - @property - def pump_state(self) -> str: - return self._pump_state - - @property - def flow_rate(self) -> float: - return self._flow_rate - - @property - def target_flow_rate(self) -> float: - return self._target_flow_rate - - @property - def pressure(self) -> float: - return self._pressure - - @property - def total_volume(self) -> float: - return self._total_volume - - @property - def direction(self) -> str: - return self._direction - - @property - def max_flow_rate(self) -> float: - return self._max_flow_rate - - @property - def max_pressure(self) -> float: - return self._max_pressure - - # ==================== 设备控制方法 ==================== - # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - else: - self._status = "Power Off" - # 关机时停止所有运行 - self.stop_pump() - - return "Success" - - def set_flow_rate(self, flow_rate: float) -> str: - """ - 设置目标流速 - - Args: - flow_rate (float): 目标流速 (mL/min) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - try: - flow_rate = float(flow_rate) - except ValueError: - self._status = "Error: Invalid flow rate" - return "Error" - - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if flow_rate < 0 or flow_rate > self._max_flow_rate: - self._status = f"Error: Flow rate out of range (0-{self._max_flow_rate})" - return "Error" - - self._target_flow_rate = flow_rate - self._status = "Setting Flow Rate" - - # 如果设置了非零流速,自动启动泵 - if flow_rate > 0: - # 自动切换泵状态为 "Running" 以触发操作循环 - self._pump_state = "Running" - self._start_pump_operation() - else: - self.stop_pump() - - return "Success" - - def start_pump(self) -> str: - """ - 启动泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if self._target_flow_rate <= 0: - self._status = "Error: No target flow rate set" - return "Error" - - self._pump_state = "Running" - self._status = "Starting Pump" - self._start_pump_operation() - - return "Success" - - def stop_pump(self) -> str: - """ - 停止泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._pump_state = "Stopped" - self._status = "Stopping Pump" - self._stop_pump_operation() - self._flow_rate = 0.0 - self._pressure = 0.0 - - return "Success" - - def pause_pump(self) -> str: - """ - 暂停泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._pump_state != "Running": - self._status = "Error: Pump not running" - return "Error" - - self._pump_state = "Paused" - self._status = "Pump Paused" - self._stop_pump_operation() - - return "Success" - - def resume_pump(self) -> str: - """ - 恢复泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._pump_state != "Paused": - self._status = "Error: Pump not paused" - return "Error" - - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - self._pump_state = "Running" - self._status = "Resuming Pump" - self._start_pump_operation() - - return "Success" - - def set_direction(self, direction: str = "Forward") -> str: - """ - 设置泵方向 - - Args: - direction (str): 泵方向,可选值:"Forward", "Reverse" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if direction not in ["Forward", "Reverse"]: - self._status = "Error: Invalid direction" - return "Error" - - # 如果泵正在运行,需要先停止 - was_running = self._pump_state == "Running" - if was_running: - self.stop_pump() - time.sleep(0.5) # 等待停止完成 - - self._direction = direction - self._status = f"Direction set to {direction}" - - # 如果之前在运行,重新启动 - if was_running: - self.start_pump() - - return "Success" - - def reset_volume_counter(self) -> str: - """ - 重置累计流量计数器 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._total_volume = 0.0 - self._status = "Volume counter reset" - return "Success" - - def emergency_stop(self) -> str: - """ - 紧急停止 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._status = "Emergency Stop" - self._pump_state = "Stopped" - self._stop_pump_operation() - self._flow_rate = 0.0 - self._pressure = 0.0 - self._target_flow_rate = 0.0 - - return "Success" - - # ==================== 内部控制方法 ==================== - - def _start_pump_operation(self): - """ - 启动泵运行线程 - - 这个方法启动一个后台线程来模拟泵的实际运行过程, - 包括流速控制、压力变化和累计流量计算。 - """ - with self._thread_lock: - if not self._running and self._power_state == "On": - self._running = True - self._pump_thread = threading.Thread(target=self._pump_operation_loop) - self._pump_thread.daemon = True - self._pump_thread.start() - - def _stop_pump_operation(self): - """ - 停止泵运行线程 - - 安全地停止后台运行线程并等待其完成。 - """ - with self._thread_lock: - self._running = False - if self._pump_thread and self._pump_thread.is_alive(): - self._pump_thread.join(timeout=2.0) - - def _pump_operation_loop(self): - """ - 泵运行主循环 - - 这个方法在后台线程中运行,模拟真实泵的工作过程: - 1. 逐步调整流速到目标值 - 2. 根据流速计算压力 - 3. 累计流量统计 - 4. 状态更新 - """ - while self._running and self._power_state == "On" and self._pump_state == "Running": - try: - # 模拟流速调节过程(逐步接近目标流速) - flow_diff = self._target_flow_rate - self._flow_rate - - if abs(flow_diff) < 0.1: # 流速接近目标值 - self._flow_rate = self._target_flow_rate - self._status = "At Target Flow Rate" - else: - # 模拟流速调节,每秒调整10%的差值 - adjustment = flow_diff * 0.1 - self._flow_rate += adjustment - self._status = "Adjusting Flow Rate" - - # 确保流速在合理范围内 - self._flow_rate = max(0.0, min(self._max_flow_rate, self._flow_rate)) - - # 模拟压力变化(压力与流速成正比,加上一些随机波动) - base_pressure = (self._flow_rate / self._max_flow_rate) * self._max_pressure - pressure_variation = 0.1 * base_pressure * (time.time() % 1.0 - 0.5) # ±5%波动 - self._pressure = max(0.0, base_pressure + pressure_variation) - - # 累计流量计算(每秒更新) - if self._flow_rate > 0: - volume_increment = self._flow_rate / 60.0 # 转换为mL/s - if self._direction == "Reverse": - volume_increment = -volume_increment - self._total_volume += volume_increment - - # 压力保护检查 - if self._pressure > self._max_pressure * 0.95: - self._status = "Warning: High Pressure" - - # 等待1秒后继续下一次循环 - time.sleep(1.0) - - except Exception as e: - self._status = f"Error in pump operation: {str(e)}" - break - - # 循环结束时的清理工作 - if self._pump_state == "Running": - self._status = "Idle" - - def get_status_info(self) -> dict: - """ - 获取完整的设备状态信息 - - Returns: - dict: 包含所有设备状态的字典 - """ - return { - "status": self._status, - "power_state": self._power_state, - "pump_state": self._pump_state, - "flow_rate": self._flow_rate, - "target_flow_rate": self._target_flow_rate, - "pressure": self._pressure, - "total_volume": self._total_volume, - "direction": self._direction, - "max_flow_rate": self._max_flow_rate, - "max_pressure": self._max_pressure, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - pump = MockPump() - - # 测试基本功能 - print("启动泵设备测试...") - pump.power_control("On") - print(f"初始状态: {pump.get_status_info()}") - - # 设置流速并启动 - pump.set_flow_rate(50.0) - pump.start_pump() - - # 模拟运行10秒 - for i in range(10): - time.sleep(1) - print(f"第{i+1}秒: 流速={pump.flow_rate:.1f}mL/min, 压力={pump.pressure:.2f}bar, 状态={pump.status}") - - # 测试方向切换 - print("切换泵方向...") - pump.set_direction("Reverse") - - # 继续运行5秒 - for i in range(5): - time.sleep(1) - print(f"反向第{i+1}秒: 累计流量={pump.total_volume:.1f}mL, 方向={pump.direction}") - - pump.emergency_stop() - print("测试完成") diff --git a/unilabos/devices/Mock/mock_rotavap.py b/unilabos/devices/Mock/mock_rotavap.py deleted file mode 100644 index bd2f2f8..0000000 --- a/unilabos/devices/Mock/mock_rotavap.py +++ /dev/null @@ -1,443 +0,0 @@ -import time -import threading -import json - - -class MockRotavap: - """ - 模拟旋转蒸发器设备类 - - 这个类模拟了一个实验室旋转蒸发器的行为,包括旋转控制、 - 真空泵控制、温度控制等功能。参考了现有的 RotavapOne 实现。 - """ - - def __init__(self, port: str = "MOCK"): - """ - 初始化MockRotavap实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ - self.port = port - - # 设备基本状态属性 - self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off - - # 旋转相关属性 - self._rotate_state: str = "Stopped" # 旋转状态:Running, Stopped - self._rotate_time: float = 0.0 # 旋转剩余时间 (秒) - self._rotate_speed: float = 0.0 # 旋转速度 (rpm) - self._max_rotate_speed: float = 300.0 # 最大旋转速度 (rpm) - - # 真空泵相关属性 - self._pump_state: str = "Stopped" # 泵状态:Running, Stopped - self._pump_time: float = 0.0 # 泵剩余时间 (秒) - self._vacuum_level: float = 0.0 # 真空度 (mbar) - self._target_vacuum: float = 50.0 # 目标真空度 (mbar) - - # 温度相关属性 - self._temperature: float = 25.0 # 水浴温度 (°C) - self._target_temperature: float = 25.0 # 目标温度 (°C) - self._max_temperature: float = 180.0 # 最大温度 (°C) - - # 运行控制线程 - self._operation_thread = None - self._running = False - self._thread_lock = threading.Lock() - - # 操作成功标志 - self.success: str = "True" # 使用字符串而不是布尔值 - - # ==================== 状态属性 ==================== - # 这些属性会被Uni-Lab系统自动识别并定时对外广播 - - @property - def status(self) -> str: - return self._status - - @property - def power_state(self) -> str: - return self._power_state - - @property - def rotate_state(self) -> str: - return self._rotate_state - - @property - def rotate_time(self) -> float: - return self._rotate_time - - @property - def rotate_speed(self) -> float: - return self._rotate_speed - - @property - def pump_state(self) -> str: - return self._pump_state - - @property - def pump_time(self) -> float: - return self._pump_time - - @property - def vacuum_level(self) -> float: - return self._vacuum_level - - @property - def temperature(self) -> float: - return self._temperature - - @property - def target_temperature(self) -> float: - return self._target_temperature - - # ==================== 设备控制方法 ==================== - # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - self.success = "False" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - self._start_operation() - else: - self._status = "Power Off" - self.stop_all_operations() - - self.success = "True" - return "Success" - - def set_timer(self, command: str) -> str: - """ - 设置定时器 - 兼容现有RotavapOne接口 - - Args: - command (str): JSON格式的命令字符串,包含rotate_time和pump_time - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - self.success = "False" - return "Error" - - try: - timer = json.loads(command) - rotate_time = timer.get("rotate_time", 0) - pump_time = timer.get("pump_time", 0) - - self.success = "False" - self._rotate_time = float(rotate_time) - self._pump_time = float(pump_time) - self.success = "True" - - self._status = "Timer Set" - return "Success" - - except (json.JSONDecodeError, ValueError, KeyError) as e: - self._status = f"Error: Invalid command format - {str(e)}" - self.success = "False" - return "Error" - - def set_rotate_time(self, time_seconds: float) -> str: - """ - 设置旋转时间 - - Args: - time_seconds (float): 旋转时间 (秒) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - self.success = "False" - self._rotate_time = max(0.0, float(time_seconds)) - self.success = "True" - self._status = "Rotate time set" - return "Success" - - def set_pump_time(self, time_seconds: float) -> str: - """ - 设置泵时间 - - Args: - time_seconds (float): 泵时间 (秒) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - self.success = "False" - self._pump_time = max(0.0, float(time_seconds)) - self.success = "True" - self._status = "Pump time set" - return "Success" - - def set_rotate_speed(self, speed: float) -> str: - """ - 设置旋转速度 - - Args: - speed (float): 旋转速度 (rpm) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if speed < 0 or speed > self._max_rotate_speed: - self._status = f"Error: Speed out of range (0-{self._max_rotate_speed})" - return "Error" - - self._rotate_speed = speed - self._status = "Rotate speed set" - return "Success" - - def set_temperature(self, temperature: float) -> str: - """ - 设置水浴温度 - - Args: - temperature (float): 目标温度 (°C) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if temperature < 0 or temperature > self._max_temperature: - self._status = f"Error: Temperature out of range (0-{self._max_temperature})" - return "Error" - - self._target_temperature = temperature - self._status = "Temperature set" - return "Success" - - def start_rotation(self) -> str: - """ - 启动旋转 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if self._rotate_time <= 0: - self._status = "Error: No rotate time set" - return "Error" - - self._rotate_state = "Running" - self._status = "Rotation started" - return "Success" - - def start_pump(self) -> str: - """ - 启动真空泵 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if self._pump_time <= 0: - self._status = "Error: No pump time set" - return "Error" - - self._pump_state = "Running" - self._status = "Pump started" - return "Success" - - def stop_all_operations(self) -> str: - """ - 停止所有操作 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._rotate_state = "Stopped" - self._pump_state = "Stopped" - self._stop_operation() - self._rotate_time = 0.0 - self._pump_time = 0.0 - self._vacuum_level = 0.0 - self._status = "All operations stopped" - return "Success" - - def emergency_stop(self) -> str: - """ - 紧急停止 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._status = "Emergency Stop" - self.stop_all_operations() - return "Success" - - # ==================== 内部控制方法 ==================== - - def _start_operation(self): - """ - 启动操作线程 - - 这个方法启动一个后台线程来模拟旋蒸的实际运行过程。 - """ - with self._thread_lock: - if not self._running and self._power_state == "On": - self._running = True - self._operation_thread = threading.Thread(target=self._operation_loop) - self._operation_thread.daemon = True - self._operation_thread.start() - - def _stop_operation(self): - """ - 停止操作线程 - - 安全地停止后台运行线程并等待其完成。 - """ - with self._thread_lock: - self._running = False - if self._operation_thread and self._operation_thread.is_alive(): - self._operation_thread.join(timeout=2.0) - - def _operation_loop(self): - """ - 操作主循环 - - 这个方法在后台线程中运行,模拟真实旋蒸的工作过程: - 1. 时间倒计时 - 2. 温度控制 - 3. 真空度控制 - 4. 状态更新 - """ - while self._running and self._power_state == "On": - try: - # 处理旋转时间倒计时 - if self._rotate_time > 0: - self._rotate_state = "Running" - self._rotate_time = max(0.0, self._rotate_time - 1.0) - else: - self._rotate_state = "Stopped" - - # 处理泵时间倒计时 - if self._pump_time > 0: - self._pump_state = "Running" - self._pump_time = max(0.0, self._pump_time - 1.0) - # 模拟真空度变化 - if self._vacuum_level > self._target_vacuum: - self._vacuum_level = max(self._target_vacuum, self._vacuum_level - 5.0) - else: - self._pump_state = "Stopped" - # 真空度逐渐回升 - self._vacuum_level = min(1013.25, self._vacuum_level + 2.0) - - # 模拟温度控制 - temp_diff = self._target_temperature - self._temperature - if abs(temp_diff) > 0.5: - if temp_diff > 0: - self._temperature += min(1.0, temp_diff * 0.1) - else: - self._temperature += max(-1.0, temp_diff * 0.1) - - # 更新整体状态 - if self._rotate_state == "Running" or self._pump_state == "Running": - self._status = "Operating" - elif self._rotate_time > 0 or self._pump_time > 0: - self._status = "Ready" - else: - self._status = "Idle" - - # 等待1秒后继续下一次循环 - time.sleep(1.0) - - except Exception as e: - self._status = f"Error in operation: {str(e)}" - break - - # 循环结束时的清理工作 - if self._power_state == "On": - self._status = "Idle" - - def get_status_info(self) -> dict: - """ - 获取完整的设备状态信息 - - Returns: - dict: 包含所有设备状态的字典 - """ - return { - "status": self._status, - "power_state": self._power_state, - "rotate_state": self._rotate_state, - "rotate_time": self._rotate_time, - "rotate_speed": self._rotate_speed, - "pump_state": self._pump_state, - "pump_time": self._pump_time, - "vacuum_level": self._vacuum_level, - "temperature": self._temperature, - "target_temperature": self._target_temperature, - "success": self.success, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - rotavap = MockRotavap() - - # 测试基本功能 - print("启动旋转蒸发器测试...") - rotavap.power_control("On") - print(f"初始状态: {rotavap.get_status_info()}") - - # 设置定时器 - timer_command = '{"rotate_time": 300, "pump_time": 600}' - rotavap.set_timer(timer_command) - - # 设置温度和转速 - rotavap.set_temperature(60.0) - rotavap.set_rotate_speed(120.0) - - # 启动操作 - rotavap.start_rotation() - rotavap.start_pump() - - # 模拟运行10秒 - for i in range(10): - time.sleep(1) - print( - f"第{i+1}秒: 旋转={rotavap.rotate_time:.0f}s, 泵={rotavap.pump_time:.0f}s, " - f"温度={rotavap.temperature:.1f}°C, 真空={rotavap.vacuum_level:.1f}mbar" - ) - - rotavap.emergency_stop() - print("测试完成") diff --git a/unilabos/devices/Mock/mock_separator.py b/unilabos/devices/Mock/mock_separator.py deleted file mode 100644 index fdda4ba..0000000 --- a/unilabos/devices/Mock/mock_separator.py +++ /dev/null @@ -1,184 +0,0 @@ -import time -import threading - - -class MockSeparator: - def __init__(self, port: str = "MOCK"): - self.port = port - - # 基本状态属性 - self._power_state: str = "Off" # 电源:On 或 Off - self._status: str = "Idle" # 当前总体状态 - self._valve_state: str = "Closed" # 阀门状态:Open 或 Closed - self._settling_time: float = 0.0 # 静置时间(秒) - - # 搅拌相关属性 - self._shake_time: float = 0.0 # 剩余摇摆时间(秒) - self._shake_status: str = "Not Shaking" # 摇摆状态 - - # 用于后台模拟 shake 动作 - self._operation_thread = None - self._thread_lock = threading.Lock() - - @property - def power_state(self) -> str: - return self._power_state - - @property - def valve_state(self) -> str: - return self._valve_state - - @property - def settling_time(self) -> float: - return self._settling_time - - @property - def status(self) -> str: - return self._status - - @property - def shake_time(self) -> float: - with self._thread_lock: - return self._shake_time - - @property - def shake_status(self) -> str: - with self._thread_lock: - return self._shake_status - - def power_control(self, power_state: str) -> str: - """ - 电源控制:只接受 "On" 或 "Off" - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - - self._power_state = power_state - if power_state == "On": - self._status = "Powered On" - else: - self._status = "Powered Off" - self.stop_operations() - return "Success" - - def shake(self, shake_time: float) -> str: - """ - 模拟 shake(搅拌)操作: - - 进入 "Shaking" 状态,倒计时 shake_time 秒 - - shake 结束后,进入 "Settling" 状态,静置时间固定为 5 秒 - - 最后恢复为 Idle - """ - try: - shake_time = float(shake_time) - except ValueError: - self._status = "Error: Invalid shake time" - return "Error" - - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - with self._thread_lock: - self._status = "Shaking" - self._settling_time = 0.0 - self._shake_time = shake_time - self._shake_status = "Shaking" - - def _run_shake(): - remaining = shake_time - while remaining > 0: - time.sleep(1) - remaining -= 1 - with self._thread_lock: - self._shake_time = remaining - with self._thread_lock: - self._status = "Settling" - self._settling_time = 60.0 # 固定静置时间为60秒 - self._shake_status = "Settling" - while True: - with self._thread_lock: - if self._settling_time <= 0: - self._status = "Idle" - self._shake_status = "Idle" - break - time.sleep(1) - with self._thread_lock: - self._settling_time = max(0.0, self._settling_time - 1) - - self._operation_thread = threading.Thread(target=_run_shake) - self._operation_thread.daemon = True - self._operation_thread.start() - return "Success" - - def set_valve(self, command: str) -> str: - """ - 阀门控制命令:传入 "open" 或 "close" - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - command = command.lower() - if command == "open": - self._valve_state = "Open" - self._status = "Valve Opened" - elif command == "close": - self._valve_state = "Closed" - self._status = "Valve Closed" - else: - self._status = "Error: Invalid valve command" - return "Error" - return "Success" - - def stop_operations(self) -> str: - """ - 停止任何正在执行的操作 - """ - with self._thread_lock: - self._settling_time = 0.0 - self._status = "Idle" - self._shake_status = "Idle" - self._shake_time = 0.0 - return "Success" - - def get_status_info(self) -> dict: - """ - 获取当前设备状态信息 - """ - with self._thread_lock: - return { - "status": self._status, - "power_state": self._power_state, - "valve_state": self._valve_state, - "settling_time": self._settling_time, - "shake_time": self._shake_time, - "shake_status": self._shake_status, - } - - -# 主函数用于测试 -if __name__ == "__main__": - separator = MockSeparator() - - print("启动简单版分离器测试...") - print(separator.power_control("On")) - print("初始状态:", separator.get_status_info()) - - # 触发 shake 操作,模拟 10 秒的搅拌 - print("执行 shake 操作...") - print(separator.shake(10.0)) - - # 循环显示状态变化 - for i in range(20): - time.sleep(1) - info = separator.get_status_info() - print( - f"第{i+1}秒: 状态={info['status']}, 静置时间={info['settling_time']:.1f}秒, " - f"阀门状态={info['valve_state']}, shake_time={info['shake_time']:.1f}, " - f"shake_status={info['shake_status']}" - ) - - # 模拟打开阀门 - print("打开阀门...", separator.set_valve("open")) - print("最终状态:", separator.get_status_info()) diff --git a/unilabos/devices/Mock/mock_solenoid_valve.py b/unilabos/devices/Mock/mock_solenoid_valve.py deleted file mode 100644 index 0f0fbe5..0000000 --- a/unilabos/devices/Mock/mock_solenoid_valve.py +++ /dev/null @@ -1,89 +0,0 @@ -import time - - -class MockSolenoidValve: - """ - 模拟电磁阀设备类 - 简化版本 - - 这个类提供了电磁阀的基本功能:开启、关闭和状态查询 - """ - - def __init__(self, port: str = "MOCK"): - """ - 初始化MockSolenoidValve实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ - self.port = port - self._status: str = "Idle" - self._valve_status: str = "Closed" # 阀门位置:Open, Closed - - @property - def status(self) -> str: - """设备状态 - 会被自动识别的设备属性""" - return self._status - - @property - def valve_status(self) -> str: - """阀门状态""" - return self._valve_status - - def set_valve_status(self, status: str) -> str: - """ - 设置阀门位置 - - Args: - position (str): 阀门位置,可选值:"Open", "Closed" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if status not in ["Open", "Closed"]: - self._status = "Error: Invalid position" - return "Error" - - self._status = "Moving" - time.sleep(1) # 模拟阀门动作时间 - - self._valve_status = status - self._status = "Idle" - return "Success" - - def open_valve(self) -> str: - """打开阀门""" - return self.set_valve_status("Open") - - def close_valve(self) -> str: - """关闭阀门""" - return self.set_valve_status("Closed") - - def get_valve_status(self) -> str: - """获取阀门位置""" - return self._valve_status - - def is_open(self) -> bool: - """检查阀门是否打开""" - return self._valve_status == "Open" - - def is_closed(self) -> bool: - """检查阀门是否关闭""" - return self._valve_status == "Closed" - - -# 用于测试的主函数 -if __name__ == "__main__": - valve = MockSolenoidValve() - - print("启动电磁阀测试...") - print(f"初始状态: 位置={valve.valve_status}, 状态={valve.status}") - - # 测试开启阀门 - valve.open_valve() - print(f"开启后: 位置={valve.valve_status}, 状态={valve.status}") - - # 测试关闭阀门 - valve.close_valve() - print(f"关闭后: 位置={valve.valve_status}, 状态={valve.status}") - - print("测试完成") diff --git a/unilabos/devices/Mock/mock_stirrer.py b/unilabos/devices/Mock/mock_stirrer.py deleted file mode 100644 index 6907f7e..0000000 --- a/unilabos/devices/Mock/mock_stirrer.py +++ /dev/null @@ -1,482 +0,0 @@ -import time -import threading - - -class MockStirrer: - """ - 模拟搅拌器设备类 - - 这个类模拟了一个实验室搅拌器的行为,包括搅拌速度控制、 - 温度监测、加热控制等功能。参考了现有的 HeaterStirrer_DaLong 实现。 - """ - - def __init__(self, port: str = "MOCK"): - """ - 初始化MockStirrer实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ - self.port = port - - # 设备基本状态属性 - self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off - - # 搅拌相关属性 - self._stir_speed: float = 0.0 # 当前搅拌速度 (rpm) - self._target_stir_speed: float = 0.0 # 目标搅拌速度 (rpm) - self._max_stir_speed: float = 2000.0 # 最大搅拌速度 (rpm) - self._stir_state: str = "Stopped" # 搅拌状态:Running, Stopped - - # 温度相关属性 - self._temperature: float = 25.0 # 当前温度 (°C) - self._target_temperature: float = 25.0 # 目标温度 (°C) - self._max_temperature: float = 300.0 # 最大温度 (°C) - self._heating_state: str = "Off" # 加热状态:On, Off - self._heating_power: float = 0.0 # 加热功率百分比 0-100 - - # 运行控制线程 - self._operation_thread = None - self._running = False - self._thread_lock = threading.Lock() - - # ==================== 状态属性 ==================== - # 这些属性会被Uni-Lab系统自动识别并定时对外广播 - - @property - def status(self) -> str: - """ - 设备状态 - 会被自动识别的设备属性 - - Returns: - str: 当前设备状态 (Idle, Running, Error, Stopped) - """ - return self._status - - @property - def power_state(self) -> str: - """ - 电源状态 - - Returns: - str: 电源状态 (On, Off) - """ - return self._power_state - - @property - def stir_speed(self) -> float: - """ - 当前搅拌速度 - - Returns: - float: 当前搅拌速度 (rpm) - """ - return self._stir_speed - - @property - def target_stir_speed(self) -> float: - """ - 目标搅拌速度 - - Returns: - float: 目标搅拌速度 (rpm) - """ - return self._target_stir_speed - - @property - def stir_state(self) -> str: - """ - 搅拌状态 - - Returns: - str: 搅拌状态 (Running, Stopped) - """ - return self._stir_state - - @property - def temperature(self) -> float: - """ - 当前温度 - - Returns: - float: 当前温度 (°C) - """ - return self._temperature - - @property - def target_temperature(self) -> float: - """ - 目标温度 - - Returns: - float: 目标温度 (°C) - """ - return self._target_temperature - - @property - def heating_state(self) -> str: - """ - 加热状态 - - Returns: - str: 加热状态 (On, Off) - """ - return self._heating_state - - @property - def heating_power(self) -> float: - """ - 加热功率 - - Returns: - float: 加热功率百分比 (0-100) - """ - return self._heating_power - - @property - def max_stir_speed(self) -> float: - """ - 最大搅拌速度 - - Returns: - float: 最大搅拌速度 (rpm) - """ - return self._max_stir_speed - - @property - def max_temperature(self) -> float: - """ - 最大温度 - - Returns: - float: 最大温度 (°C) - """ - return self._max_temperature - - # ==================== 设备控制方法 ==================== - # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - self._start_operation() - else: - self._status = "Power Off" - self.stop_all_operations() - - return "Success" - - def set_stir_speed(self, speed: float) -> str: - """ - 设置搅拌速度 - - Args: - speed (float): 目标搅拌速度 (rpm) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - speed = float(speed) # 确保传入的速度是浮点数 - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if speed < 0 or speed > self._max_stir_speed: - self._status = f"Error: Speed out of range (0-{self._max_stir_speed})" - return "Error" - - self._target_stir_speed = speed - self._status = "Setting Stir Speed" - - # 如果设置了非零速度,启动搅拌 - if speed > 0: - self._stir_state = "Running" - else: - self._stir_state = "Stopped" - - return "Success" - - def set_temperature(self, temperature: float) -> str: - """ - 设置目标温度 - - Args: - temperature (float): 目标温度 (°C) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - temperature = float(temperature) # 确保传入的温度是浮点数 - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if temperature < 0 or temperature > self._max_temperature: - self._status = f"Error: Temperature out of range (0-{self._max_temperature})" - return "Error" - - self._target_temperature = temperature - self._status = "Setting Temperature" - - return "Success" - - def start_stirring(self) -> str: - """ - 启动搅拌 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if self._target_stir_speed <= 0: - self._status = "Error: No target speed set" - return "Error" - - self._stir_state = "Running" - self._status = "Stirring Started" - return "Success" - - def stop_stirring(self) -> str: - """ - 停止搅拌 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._stir_state = "Stopped" - self._target_stir_speed = 0.0 - self._status = "Stirring Stopped" - return "Success" - - def heating_control(self, heating_state: str = "On") -> str: - """ - 加热控制 - - Args: - heating_state (str): 加热状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if heating_state not in ["On", "Off"]: - self._status = "Error: Invalid heating state" - return "Error" - - self._heating_state = heating_state - - if heating_state == "On": - self._status = "Heating On" - else: - self._status = "Heating Off" - self._heating_power = 0.0 - - return "Success" - - def stop_all_operations(self) -> str: - """ - 停止所有操作 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._stir_state = "Stopped" - self._heating_state = "Off" - self._stop_operation() - self._stir_speed = 0.0 - self._target_stir_speed = 0.0 - self._heating_power = 0.0 - self._status = "All operations stopped" - return "Success" - - def emergency_stop(self) -> str: - """ - 紧急停止 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._status = "Emergency Stop" - self.stop_all_operations() - return "Success" - - # ==================== 内部控制方法 ==================== - - def _start_operation(self): - """ - 启动操作线程 - - 这个方法启动一个后台线程来模拟搅拌器的实际运行过程。 - """ - with self._thread_lock: - if not self._running and self._power_state == "On": - self._running = True - self._operation_thread = threading.Thread(target=self._operation_loop) - self._operation_thread.daemon = True - self._operation_thread.start() - - def _stop_operation(self): - """ - 停止操作线程 - - 安全地停止后台运行线程并等待其完成。 - """ - with self._thread_lock: - self._running = False - if self._operation_thread and self._operation_thread.is_alive(): - self._operation_thread.join(timeout=2.0) - - def _operation_loop(self): - """ - 操作主循环 - - 这个方法在后台线程中运行,模拟真实搅拌器的工作过程: - 1. 搅拌速度控制 - 2. 温度控制和加热 - 3. 状态更新 - """ - while self._running and self._power_state == "On": - try: - # 处理搅拌速度控制 - if self._stir_state == "Running": - speed_diff = self._target_stir_speed - self._stir_speed - - if abs(speed_diff) < 1.0: # 速度接近目标值 - self._stir_speed = self._target_stir_speed - if self._stir_speed > 0: - self._status = "Stirring at Target Speed" - else: - # 模拟速度调节,每秒调整10%的差值 - adjustment = speed_diff * 0.1 - self._stir_speed += adjustment - self._status = "Adjusting Stir Speed" - - # 确保速度在合理范围内 - self._stir_speed = max(0.0, min(self._max_stir_speed, self._stir_speed)) - else: - # 搅拌停止时,速度逐渐降为0 - if self._stir_speed > 0: - self._stir_speed = max(0.0, self._stir_speed - 50.0) # 每秒减少50rpm - - # 处理温度控制 - if self._heating_state == "On": - temp_diff = self._target_temperature - self._temperature - - if abs(temp_diff) < 0.5: # 温度接近目标值 - self._heating_power = 20.0 # 维持温度的最小功率 - elif temp_diff > 0: # 需要加热 - # 根据温差调整加热功率 - if temp_diff > 50: - self._heating_power = 100.0 - elif temp_diff > 20: - self._heating_power = 80.0 - elif temp_diff > 10: - self._heating_power = 60.0 - else: - self._heating_power = 40.0 - - # 模拟加热过程 - heating_rate = self._heating_power / 100.0 * 1.5 # 最大每秒升温1.5度 - self._temperature += heating_rate - else: # 目标温度低于当前温度 - self._heating_power = 0.0 - # 自然冷却 - self._temperature -= 0.1 - else: - self._heating_power = 0.0 - # 自然冷却到室温 - if self._temperature > 25.0: - self._temperature -= 0.2 - - # 限制温度范围 - self._temperature = max(20.0, min(self._max_temperature, self._temperature)) - - # 更新整体状态 - if self._stir_state == "Running" and self._heating_state == "On": - self._status = "Stirring and Heating" - elif self._stir_state == "Running": - self._status = "Stirring Only" - elif self._heating_state == "On": - self._status = "Heating Only" - else: - self._status = "Idle" - - # 等待1秒后继续下一次循环 - time.sleep(1.0) - - except Exception as e: - self._status = f"Error in operation: {str(e)}" - break - - # 循环结束时的清理工作 - if self._power_state == "On": - self._status = "Idle" - - def get_status_info(self) -> dict: - """ - 获取完整的设备状态信息 - - Returns: - dict: 包含所有设备状态的字典 - """ - return { - "status": self._status, - "power_state": self._power_state, - "stir_speed": self._stir_speed, - "target_stir_speed": self._target_stir_speed, - "stir_state": self._stir_state, - "temperature": self._temperature, - "target_temperature": self._target_temperature, - "heating_state": self._heating_state, - "heating_power": self._heating_power, - "max_stir_speed": self._max_stir_speed, - "max_temperature": self._max_temperature, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - stirrer = MockStirrer() - - # 测试基本功能 - print("启动搅拌器测试...") - stirrer.power_control("On") - print(f"初始状态: {stirrer.get_status_info()}") - - # 设置搅拌速度和温度 - stirrer.set_stir_speed(800.0) - stirrer.set_temperature(60.0) - stirrer.heating_control("On") - - # 模拟运行15秒 - for i in range(15): - time.sleep(1) - print( - f"第{i+1}秒: 速度={stirrer.stir_speed:.0f}rpm, 温度={stirrer.temperature:.1f}°C, " - f"功率={stirrer.heating_power:.1f}%, 状态={stirrer.status}" - ) - - stirrer.emergency_stop() - print("测试完成") diff --git a/unilabos/devices/Mock/mock_vacuum.py b/unilabos/devices/Mock/mock_vacuum.py deleted file mode 100644 index 9e368a9..0000000 --- a/unilabos/devices/Mock/mock_vacuum.py +++ /dev/null @@ -1,410 +0,0 @@ -import time -import threading - - -class MockVacuum: - """ - 模拟真空泵设备类 - - 这个类模拟了一个实验室真空泵的行为,包括真空度控制、 - 压力监测、运行状态管理等功能。参考了现有的 VacuumPumpMock 实现。 - """ - - def __init__(self, port: str = "MOCK"): - """ - 初始化MockVacuum实例 - - Args: - port (str): 设备端口,默认为"MOCK"表示模拟设备 - """ - self.port = port - - # 设备基本状态属性 - self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped - self._power_state: str = "Off" # 电源状态:On, Off - self._pump_state: str = "Stopped" # 泵运行状态:Running, Stopped, Paused - - # 真空相关属性 - self._vacuum_level: float = 1013.25 # 当前真空度 (mbar) - 大气压开始 - self._target_vacuum: float = 50.0 # 目标真空度 (mbar) - self._min_vacuum: float = 1.0 # 最小真空度 (mbar) - self._max_vacuum: float = 1013.25 # 最大真空度 (mbar) - 大气压 - - # 泵性能相关属性 - self._pump_speed: float = 0.0 # 泵速 (L/s) - self._max_pump_speed: float = 100.0 # 最大泵速 (L/s) - self._pump_efficiency: float = 95.0 # 泵效率百分比 - - # 运行控制线程 - self._vacuum_thread = None - self._running = False - self._thread_lock = threading.Lock() - - # ==================== 状态属性 ==================== - # 这些属性会被Uni-Lab系统自动识别并定时对外广播 - - @property - def status(self) -> str: - """ - 设备状态 - 会被自动识别的设备属性 - - Returns: - str: 当前设备状态 (Idle, Running, Error, Stopped) - """ - return self._status - - @property - def power_state(self) -> str: - """ - 电源状态 - - Returns: - str: 电源状态 (On, Off) - """ - return self._power_state - - @property - def pump_state(self) -> str: - """ - 泵运行状态 - - Returns: - str: 泵状态 (Running, Stopped, Paused) - """ - return self._pump_state - - @property - def vacuum_level(self) -> float: - """ - 当前真空度 - - Returns: - float: 当前真空度 (mbar) - """ - return self._vacuum_level - - @property - def target_vacuum(self) -> float: - """ - 目标真空度 - - Returns: - float: 目标真空度 (mbar) - """ - return self._target_vacuum - - @property - def pump_speed(self) -> float: - """ - 泵速 - - Returns: - float: 泵速 (L/s) - """ - return self._pump_speed - - @property - def pump_efficiency(self) -> float: - """ - 泵效率 - - Returns: - float: 泵效率百分比 - """ - return self._pump_efficiency - - @property - def max_pump_speed(self) -> float: - """ - 最大泵速 - - Returns: - float: 最大泵速 (L/s) - """ - return self._max_pump_speed - - # ==================== 设备控制方法 ==================== - # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 - - def power_control(self, power_state: str = "On") -> str: - """ - 电源控制方法 - - Args: - power_state (str): 电源状态,可选值:"On", "Off" - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if power_state not in ["On", "Off"]: - self._status = "Error: Invalid power state" - return "Error" - - self._power_state = power_state - - if power_state == "On": - self._status = "Power On" - self._start_vacuum_operation() - else: - self._status = "Power Off" - self.stop_vacuum() - - return "Success" - - def set_vacuum_level(self, vacuum_level: float) -> str: - """ - 设置目标真空度 - - Args: - vacuum_level (float): 目标真空度 (mbar) - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - try: - vacuum_level = float(vacuum_level) - except ValueError: - self._status = "Error: Invalid vacuum level" - return "Error" - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - if vacuum_level < self._min_vacuum or vacuum_level > self._max_vacuum: - self._status = f"Error: Vacuum level out of range ({self._min_vacuum}-{self._max_vacuum})" - return "Error" - - self._target_vacuum = vacuum_level - self._status = "Setting Vacuum Level" - - return "Success" - - def start_vacuum(self) -> str: - """ - 启动真空泵 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - self._pump_state = "Running" - self._status = "Starting Vacuum Pump" - self._start_vacuum_operation() - - return "Success" - - def stop_vacuum(self) -> str: - """ - 停止真空泵 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._pump_state = "Stopped" - self._status = "Stopping Vacuum Pump" - self._stop_vacuum_operation() - self._pump_speed = 0.0 - - return "Success" - - def pause_vacuum(self) -> str: - """ - 暂停真空泵 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._pump_state != "Running": - self._status = "Error: Pump not running" - return "Error" - - self._pump_state = "Paused" - self._status = "Vacuum Pump Paused" - self._stop_vacuum_operation() - - return "Success" - - def resume_vacuum(self) -> str: - """ - 恢复真空泵运行 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - if self._pump_state != "Paused": - self._status = "Error: Pump not paused" - return "Error" - - if self._power_state != "On": - self._status = "Error: Power Off" - return "Error" - - self._pump_state = "Running" - self._status = "Resuming Vacuum Pump" - self._start_vacuum_operation() - - return "Success" - - def vent_to_atmosphere(self) -> str: - """ - 通大气 - 将真空度恢复到大气压 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._target_vacuum = self._max_vacuum # 设置为大气压 - self._status = "Venting to Atmosphere" - return "Success" - - def emergency_stop(self) -> str: - """ - 紧急停止 - - Returns: - str: 操作结果状态 ("Success", "Error") - """ - self._status = "Emergency Stop" - self._pump_state = "Stopped" - self._stop_vacuum_operation() - self._pump_speed = 0.0 - - return "Success" - - # ==================== 内部控制方法 ==================== - - def _start_vacuum_operation(self): - """ - 启动真空操作线程 - - 这个方法启动一个后台线程来模拟真空泵的实际运行过程。 - """ - with self._thread_lock: - if not self._running and self._power_state == "On": - self._running = True - self._vacuum_thread = threading.Thread(target=self._vacuum_operation_loop) - self._vacuum_thread.daemon = True - self._vacuum_thread.start() - - def _stop_vacuum_operation(self): - """ - 停止真空操作线程 - - 安全地停止后台运行线程并等待其完成。 - """ - with self._thread_lock: - self._running = False - if self._vacuum_thread and self._vacuum_thread.is_alive(): - self._vacuum_thread.join(timeout=2.0) - - def _vacuum_operation_loop(self): - """ - 真空操作主循环 - - 这个方法在后台线程中运行,模拟真空泵的工作过程: - 1. 检查电源状态和运行状态 - 2. 如果泵状态为 "Running",根据目标真空调整泵速和真空度 - 3. 否则等待 - """ - while self._running and self._power_state == "On": - try: - with self._thread_lock: - # 只有泵状态为 Running 时才进行更新 - if self._pump_state == "Running": - vacuum_diff = self._vacuum_level - self._target_vacuum - - if abs(vacuum_diff) < 1.0: # 真空度接近目标值 - self._status = "At Target Vacuum" - self._pump_speed = self._max_pump_speed * 0.2 # 维持真空的最小泵速 - elif vacuum_diff > 0: # 需要抽真空(降低压力) - self._status = "Pumping Down" - if vacuum_diff > 500: - self._pump_speed = self._max_pump_speed - elif vacuum_diff > 100: - self._pump_speed = self._max_pump_speed * 0.8 - elif vacuum_diff > 50: - self._pump_speed = self._max_pump_speed * 0.6 - else: - self._pump_speed = self._max_pump_speed * 0.4 - - # 根据泵速和效率计算真空降幅 - pump_rate = (self._pump_speed / self._max_pump_speed) * self._pump_efficiency / 100.0 - vacuum_reduction = pump_rate * 10.0 # 每秒最大降低10 mbar - self._vacuum_level = max(self._target_vacuum, self._vacuum_level - vacuum_reduction) - else: # 目标真空度高于当前值,需要通气 - self._status = "Venting" - self._pump_speed = 0.0 - self._vacuum_level = min(self._target_vacuum, self._vacuum_level + 5.0) - - # 限制真空度范围 - self._vacuum_level = max(self._min_vacuum, min(self._max_vacuum, self._vacuum_level)) - else: - # 当泵状态不是 Running 时,可保持原状态 - self._status = "Vacuum Pump Not Running" - # 释放锁后等待1秒钟 - time.sleep(1.0) - except Exception as e: - with self._thread_lock: - self._status = f"Error in vacuum operation: {str(e)}" - break - - # 循环结束后的清理工作 - if self._pump_state == "Running": - self._status = "Idle" - # 停止泵后,真空度逐渐回升到大气压 - while self._vacuum_level < self._max_vacuum * 0.9: - with self._thread_lock: - self._vacuum_level += 2.0 - time.sleep(0.1) - - def get_status_info(self) -> dict: - """ - 获取完整的设备状态信息 - - Returns: - dict: 包含所有设备状态的字典 - """ - return { - "status": self._status, - "power_state": self._power_state, - "pump_state": self._pump_state, - "vacuum_level": self._vacuum_level, - "target_vacuum": self._target_vacuum, - "pump_speed": self._pump_speed, - "pump_efficiency": self._pump_efficiency, - "max_pump_speed": self._max_pump_speed, - } - - -# 用于测试的主函数 -if __name__ == "__main__": - vacuum = MockVacuum() - - # 测试基本功能 - print("启动真空泵测试...") - vacuum.power_control("On") - print(f"初始状态: {vacuum.get_status_info()}") - - # 设置目标真空度并启动 - vacuum.set_vacuum_level(10.0) # 设置为10mbar - vacuum.start_vacuum() - - # 模拟运行15秒 - for i in range(15): - time.sleep(1) - print( - f"第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 泵速={vacuum.pump_speed:.1f}L/s, 状态={vacuum.status}" - ) - # 测试通大气 - print("测试通大气...") - vacuum.vent_to_atmosphere() - - # 继续运行5秒观察通大气过程 - for i in range(5): - time.sleep(1) - print(f"通大气第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 状态={vacuum.status}") - - vacuum.emergency_stop() - print("测试完成")