diff --git a/test/experiments/MockVacuum.json b/test/experiments/MockVacuum.json new file mode 100644 index 0000000..702832a --- /dev/null +++ b/test/experiments/MockVacuum.json @@ -0,0 +1,31 @@ +{ + "nodes": [ + { + "id": "MockVacuum1", + "name": "模拟真空泵", + "children": [], + "parent": null, + "type": "device", + "class": "MockVacuum", + "position": { + "x": 620.6111111111111, + "y": 171, + "z": 0 + }, + "config": { + "port": "MOCK" + }, + "data": { + "status": "Idle", + "power_state": "Off", + "pump_state": "Stopped", + "vacuum_level": 1013.25, + "target_vacuum": 50.0, + "pump_speed": 0.0, + "pump_efficiency": 95.0, + "max_pump_speed": 100.0 + } + } + ], + "links": [] +} \ No newline at end of file diff --git a/unilabos/devices/Mock/MockVacuum/MockVacuum.py b/unilabos/devices/Mock/MockVacuum/MockVacuum.py new file mode 100644 index 0000000..41bb8ff --- /dev/null +++ b/unilabos/devices/Mock/MockVacuum/MockVacuum.py @@ -0,0 +1,408 @@ +import time +import threading + + +class MockVacuum: + """ + 模拟真空泵设备类 + + 这个类模拟了一个实验室真空泵的行为,包括真空度控制、 + 压力监测、运行状态管理等功能。参考了现有的 VacuumPumpMock 实现。 + """ + + def __init__(self, port: str = "MOCK"): + """ + 初始化MockVacuum实例 + + Args: + port (str): 设备端口,默认为"MOCK"表示模拟设备 + """ + self.port = port + + # 设备基本状态属性 + self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped + self._power_state: str = "Off" # 电源状态:On, Off + self._pump_state: str = "Stopped" # 泵运行状态:Running, Stopped, Paused + + # 真空相关属性 + self._vacuum_level: float = 1013.25 # 当前真空度 (mbar) - 大气压开始 + self._target_vacuum: float = 50.0 # 目标真空度 (mbar) + self._min_vacuum: float = 1.0 # 最小真空度 (mbar) + self._max_vacuum: float = 1013.25 # 最大真空度 (mbar) - 大气压 + + # 泵性能相关属性 + self._pump_speed: float = 0.0 # 泵速 (L/s) + self._max_pump_speed: float = 100.0 # 最大泵速 (L/s) + self._pump_efficiency: float = 95.0 # 泵效率百分比 + + # 运行控制线程 + self._vacuum_thread = None + self._running = False + self._thread_lock = threading.Lock() + + # ==================== 状态属性 ==================== + # 这些属性会被Uni-Lab系统自动识别并定时对外广播 + + @property + def status(self) -> str: + """ + 设备状态 - 会被自动识别的设备属性 + + Returns: + str: 当前设备状态 (Idle, Running, Error, Stopped) + """ + return self._status + + @property + def power_state(self) -> str: + """ + 电源状态 + + Returns: + str: 电源状态 (On, Off) + """ + return self._power_state + + @property + def pump_state(self) -> str: + """ + 泵运行状态 + + Returns: + str: 泵状态 (Running, Stopped, Paused) + """ + return self._pump_state + + @property + def vacuum_level(self) -> float: + """ + 当前真空度 + + Returns: + float: 当前真空度 (mbar) + """ + return self._vacuum_level + + @property + def target_vacuum(self) -> float: + """ + 目标真空度 + + Returns: + float: 目标真空度 (mbar) + """ + return self._target_vacuum + + @property + def pump_speed(self) -> float: + """ + 泵速 + + Returns: + float: 泵速 (L/s) + """ + return self._pump_speed + + @property + def pump_efficiency(self) -> float: + """ + 泵效率 + + Returns: + float: 泵效率百分比 + """ + return self._pump_efficiency + + @property + def max_pump_speed(self) -> float: + """ + 最大泵速 + + Returns: + float: 最大泵速 (L/s) + """ + return self._max_pump_speed + + # ==================== 设备控制方法 ==================== + # 这些方法需要在注册表中添加,会作为ActionServer接受控制指令 + + def power_control(self, power_state: str = "On") -> str: + """ + 电源控制方法 + + Args: + power_state (str): 电源状态,可选值:"On", "Off" + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if power_state not in ["On", "Off"]: + self._status = "Error: Invalid power state" + return "Error" + + self._power_state = power_state + + if power_state == "On": + self._status = "Power On" + self._start_vacuum_operation() + else: + self._status = "Power Off" + self.stop_vacuum() + + return "Success" + + def set_vacuum_level(self, vacuum_level: float) -> str: + """ + 设置目标真空度 + + Args: + vacuum_level (float): 目标真空度 (mbar) + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + try: + vacuum_level = float(vacuum_level) + except ValueError: + self._status = "Error: Invalid vacuum level" + return "Error" + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + if vacuum_level < self._min_vacuum or vacuum_level > self._max_vacuum: + self._status = f"Error: Vacuum level out of range ({self._min_vacuum}-{self._max_vacuum})" + return "Error" + + self._target_vacuum = vacuum_level + self._status = "Setting Vacuum Level" + + return "Success" + + def start_vacuum(self) -> str: + """ + 启动真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self._pump_state = "Running" + self._status = "Starting Vacuum Pump" + self._start_vacuum_operation() + + return "Success" + + def stop_vacuum(self) -> str: + """ + 停止真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._pump_state = "Stopped" + self._status = "Stopping Vacuum Pump" + self._stop_vacuum_operation() + self._pump_speed = 0.0 + + return "Success" + + def pause_vacuum(self) -> str: + """ + 暂停真空泵 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._pump_state != "Running": + self._status = "Error: Pump not running" + return "Error" + + self._pump_state = "Paused" + self._status = "Vacuum Pump Paused" + self._stop_vacuum_operation() + + return "Success" + + def resume_vacuum(self) -> str: + """ + 恢复真空泵运行 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + if self._pump_state != "Paused": + self._status = "Error: Pump not paused" + return "Error" + + if self._power_state != "On": + self._status = "Error: Power Off" + return "Error" + + self._pump_state = "Running" + self._status = "Resuming Vacuum Pump" + self._start_vacuum_operation() + + return "Success" + + def vent_to_atmosphere(self) -> str: + """ + 通大气 - 将真空度恢复到大气压 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._target_vacuum = self._max_vacuum # 设置为大气压 + self._status = "Venting to Atmosphere" + return "Success" + + def emergency_stop(self) -> str: + """ + 紧急停止 + + Returns: + str: 操作结果状态 ("Success", "Error") + """ + self._status = "Emergency Stop" + self._pump_state = "Stopped" + self._stop_vacuum_operation() + self._pump_speed = 0.0 + + return "Success" + + # ==================== 内部控制方法 ==================== + + def _start_vacuum_operation(self): + """ + 启动真空操作线程 + + 这个方法启动一个后台线程来模拟真空泵的实际运行过程。 + """ + with self._thread_lock: + if not self._running and self._power_state == "On": + self._running = True + self._vacuum_thread = threading.Thread(target=self._vacuum_operation_loop) + self._vacuum_thread.daemon = True + self._vacuum_thread.start() + + def _stop_vacuum_operation(self): + """ + 停止真空操作线程 + + 安全地停止后台运行线程并等待其完成。 + """ + with self._thread_lock: + self._running = False + if self._vacuum_thread and self._vacuum_thread.is_alive(): + self._vacuum_thread.join(timeout=2.0) + + def _vacuum_operation_loop(self): + """ + 真空操作主循环 + + 这个方法在后台线程中运行,模拟真空泵的工作过程: + 1. 检查电源状态和运行状态 + 2. 如果泵状态为 "Running",根据目标真空调整泵速和真空度 + 3. 否则等待 + """ + while self._running and self._power_state == "On": + try: + with self._thread_lock: + # 只有泵状态为 Running 时才进行更新 + if self._pump_state == "Running": + vacuum_diff = self._vacuum_level - self._target_vacuum + + if abs(vacuum_diff) < 1.0: # 真空度接近目标值 + self._status = "At Target Vacuum" + self._pump_speed = self._max_pump_speed * 0.2 # 维持真空的最小泵速 + elif vacuum_diff > 0: # 需要抽真空(降低压力) + self._status = "Pumping Down" + if vacuum_diff > 500: + self._pump_speed = self._max_pump_speed + elif vacuum_diff > 100: + self._pump_speed = self._max_pump_speed * 0.8 + elif vacuum_diff > 50: + self._pump_speed = self._max_pump_speed * 0.6 + else: + self._pump_speed = self._max_pump_speed * 0.4 + + # 根据泵速和效率计算真空降幅 + pump_rate = (self._pump_speed / self._max_pump_speed) * self._pump_efficiency / 100.0 + vacuum_reduction = pump_rate * 10.0 # 每秒最大降低10 mbar + self._vacuum_level = max(self._target_vacuum, self._vacuum_level - vacuum_reduction) + else: # 目标真空度高于当前值,需要通气 + self._status = "Venting" + self._pump_speed = 0.0 + self._vacuum_level = min(self._target_vacuum, self._vacuum_level + 5.0) + + # 限制真空度范围 + self._vacuum_level = max(self._min_vacuum, min(self._max_vacuum, self._vacuum_level)) + else: + # 当泵状态不是 Running 时,可保持原状态 + self._status = "Vacuum Pump Not Running" + # 释放锁后等待1秒钟 + time.sleep(1.0) + except Exception as e: + with self._thread_lock: + self._status = f"Error in vacuum operation: {str(e)}" + break + + # 循环结束后的清理工作 + if self._pump_state == "Running": + self._status = "Idle" + # 停止泵后,真空度逐渐回升到大气压 + while self._vacuum_level < self._max_vacuum * 0.9: + with self._thread_lock: + self._vacuum_level += 2.0 + time.sleep(0.1) + + def get_status_info(self) -> dict: + """ + 获取完整的设备状态信息 + + Returns: + dict: 包含所有设备状态的字典 + """ + return { + "status": self._status, + "power_state": self._power_state, + "pump_state": self._pump_state, + "vacuum_level": self._vacuum_level, + "target_vacuum": self._target_vacuum, + "pump_speed": self._pump_speed, + "pump_efficiency": self._pump_efficiency, + "max_pump_speed": self._max_pump_speed + } + + +# 用于测试的主函数 +if __name__ == "__main__": + vacuum = MockVacuum() + + # 测试基本功能 + print("启动真空泵测试...") + vacuum.power_control("On") + print(f"初始状态: {vacuum.get_status_info()}") + + # 设置目标真空度并启动 + vacuum.set_vacuum_level(10.0) # 设置为10mbar + vacuum.start_vacuum() + + # 模拟运行15秒 + for i in range(15): + time.sleep(1) + print(f"第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 泵速={vacuum.pump_speed:.1f}L/s, 状态={vacuum.status}") + # 测试通大气 + print("测试通大气...") + vacuum.vent_to_atmosphere() + + # 继续运行5秒观察通大气过程 + for i in range(5): + time.sleep(1) + print(f"通大气第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 状态={vacuum.status}") + + vacuum.emergency_stop() + print("测试完成") \ No newline at end of file diff --git a/unilabos/devices/Mock/MockVacuum/__init__.py b/unilabos/devices/Mock/MockVacuum/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/unilabos/registry/devices/MockVacuum.yaml b/unilabos/registry/devices/MockVacuum.yaml new file mode 100644 index 0000000..d5a5b83 --- /dev/null +++ b/unilabos/registry/devices/MockVacuum.yaml @@ -0,0 +1,92 @@ +MockVacuum: + description: Mock Vacuum Pump Device + class: + module: unilabos.devices.Mock.MockVacuum.MockVacuum:MockVacuum + type: python + status_types: + status: String + power_state: String + pump_state: String + vacuum_level: Float64 + target_vacuum: Float64 + pump_speed: Float64 + pump_efficiency: Float64 + max_pump_speed: Float64 + action_value_mappings: + power_control: + type: SendCmd + goal: + command: power_state + feedback: {} + result: + success: success + set_vacuum_level: + type: SendCmd + goal: + command: vacuum_level + feedback: {} + result: + success: success + start_vacuum: + type: SendCmd + goal: {} + feedback: {} + result: + success: success + stop_vacuum: + type: SendCmd + goal: {} + feedback: {} + result: + success: success + pause_vacuum: + type: SendCmd + goal: {} + feedback: {} + result: + success: success + resume_vacuum: + type: SendCmd + goal: {} + feedback: {} + result: + success: success + vent_to_atmosphere: + type: SendCmd + goal: {} + feedback: {} + result: + success: success + schema: + type: object + properties: + status: + type: string + description: Current status of the vacuum pump + power_state: + type: string + description: Power state (On/Off) + pump_state: + type: string + description: Pump operation state (Running/Stopped/Paused) + vacuum_level: + type: number + description: Current vacuum level in mbar + target_vacuum: + type: number + description: Target vacuum level in mbar + pump_speed: + type: number + description: Current pump speed in L/s + pump_efficiency: + type: number + description: Pump efficiency percentage + max_pump_speed: + type: number + description: Maximum pump speed in L/s + required: + - status + - power_state + - pump_state + - vacuum_level + additionalProperties: false \ No newline at end of file