Add Device MockVacuum

This commit is contained in:
KCFeng425
2025-06-05 15:12:35 +08:00
parent 4e636b91b4
commit 02b4750637
4 changed files with 531 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
{
"nodes": [
{
"id": "MockVacuum1",
"name": "模拟真空泵",
"children": [],
"parent": null,
"type": "device",
"class": "MockVacuum",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"pump_state": "Stopped",
"vacuum_level": 1013.25,
"target_vacuum": 50.0,
"pump_speed": 0.0,
"pump_efficiency": 95.0,
"max_pump_speed": 100.0
}
}
],
"links": []
}

View File

@@ -0,0 +1,408 @@
import time
import threading
class MockVacuum:
"""
模拟真空泵设备类
这个类模拟了一个实验室真空泵的行为,包括真空度控制、
压力监测、运行状态管理等功能。参考了现有的 VacuumPumpMock 实现。
"""
def __init__(self, port: str = "MOCK"):
"""
初始化MockVacuum实例
Args:
port (str): 设备端口,默认为"MOCK"表示模拟设备
"""
self.port = port
# 设备基本状态属性
self._status: str = "Idle" # 设备状态Idle, Running, Error, Stopped
self._power_state: str = "Off" # 电源状态On, Off
self._pump_state: str = "Stopped" # 泵运行状态Running, Stopped, Paused
# 真空相关属性
self._vacuum_level: float = 1013.25 # 当前真空度 (mbar) - 大气压开始
self._target_vacuum: float = 50.0 # 目标真空度 (mbar)
self._min_vacuum: float = 1.0 # 最小真空度 (mbar)
self._max_vacuum: float = 1013.25 # 最大真空度 (mbar) - 大气压
# 泵性能相关属性
self._pump_speed: float = 0.0 # 泵速 (L/s)
self._max_pump_speed: float = 100.0 # 最大泵速 (L/s)
self._pump_efficiency: float = 95.0 # 泵效率百分比
# 运行控制线程
self._vacuum_thread = None
self._running = False
self._thread_lock = threading.Lock()
# ==================== 状态属性 ====================
# 这些属性会被Uni-Lab系统自动识别并定时对外广播
@property
def status(self) -> str:
"""
设备状态 - 会被自动识别的设备属性
Returns:
str: 当前设备状态 (Idle, Running, Error, Stopped)
"""
return self._status
@property
def power_state(self) -> str:
"""
电源状态
Returns:
str: 电源状态 (On, Off)
"""
return self._power_state
@property
def pump_state(self) -> str:
"""
泵运行状态
Returns:
str: 泵状态 (Running, Stopped, Paused)
"""
return self._pump_state
@property
def vacuum_level(self) -> float:
"""
当前真空度
Returns:
float: 当前真空度 (mbar)
"""
return self._vacuum_level
@property
def target_vacuum(self) -> float:
"""
目标真空度
Returns:
float: 目标真空度 (mbar)
"""
return self._target_vacuum
@property
def pump_speed(self) -> float:
"""
泵速
Returns:
float: 泵速 (L/s)
"""
return self._pump_speed
@property
def pump_efficiency(self) -> float:
"""
泵效率
Returns:
float: 泵效率百分比
"""
return self._pump_efficiency
@property
def max_pump_speed(self) -> float:
"""
最大泵速
Returns:
float: 最大泵速 (L/s)
"""
return self._max_pump_speed
# ==================== 设备控制方法 ====================
# 这些方法需要在注册表中添加会作为ActionServer接受控制指令
def power_control(self, power_state: str = "On") -> str:
"""
电源控制方法
Args:
power_state (str): 电源状态,可选值:"On", "Off"
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if power_state not in ["On", "Off"]:
self._status = "Error: Invalid power state"
return "Error"
self._power_state = power_state
if power_state == "On":
self._status = "Power On"
self._start_vacuum_operation()
else:
self._status = "Power Off"
self.stop_vacuum()
return "Success"
def set_vacuum_level(self, vacuum_level: float) -> str:
"""
设置目标真空度
Args:
vacuum_level (float): 目标真空度 (mbar)
Returns:
str: 操作结果状态 ("Success", "Error")
"""
try:
vacuum_level = float(vacuum_level)
except ValueError:
self._status = "Error: Invalid vacuum level"
return "Error"
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if vacuum_level < self._min_vacuum or vacuum_level > self._max_vacuum:
self._status = f"Error: Vacuum level out of range ({self._min_vacuum}-{self._max_vacuum})"
return "Error"
self._target_vacuum = vacuum_level
self._status = "Setting Vacuum Level"
return "Success"
def start_vacuum(self) -> str:
"""
启动真空泵
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
self._pump_state = "Running"
self._status = "Starting Vacuum Pump"
self._start_vacuum_operation()
return "Success"
def stop_vacuum(self) -> str:
"""
停止真空泵
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._pump_state = "Stopped"
self._status = "Stopping Vacuum Pump"
self._stop_vacuum_operation()
self._pump_speed = 0.0
return "Success"
def pause_vacuum(self) -> str:
"""
暂停真空泵
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._pump_state != "Running":
self._status = "Error: Pump not running"
return "Error"
self._pump_state = "Paused"
self._status = "Vacuum Pump Paused"
self._stop_vacuum_operation()
return "Success"
def resume_vacuum(self) -> str:
"""
恢复真空泵运行
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._pump_state != "Paused":
self._status = "Error: Pump not paused"
return "Error"
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
self._pump_state = "Running"
self._status = "Resuming Vacuum Pump"
self._start_vacuum_operation()
return "Success"
def vent_to_atmosphere(self) -> str:
"""
通大气 - 将真空度恢复到大气压
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._target_vacuum = self._max_vacuum # 设置为大气压
self._status = "Venting to Atmosphere"
return "Success"
def emergency_stop(self) -> str:
"""
紧急停止
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._status = "Emergency Stop"
self._pump_state = "Stopped"
self._stop_vacuum_operation()
self._pump_speed = 0.0
return "Success"
# ==================== 内部控制方法 ====================
def _start_vacuum_operation(self):
"""
启动真空操作线程
这个方法启动一个后台线程来模拟真空泵的实际运行过程。
"""
with self._thread_lock:
if not self._running and self._power_state == "On":
self._running = True
self._vacuum_thread = threading.Thread(target=self._vacuum_operation_loop)
self._vacuum_thread.daemon = True
self._vacuum_thread.start()
def _stop_vacuum_operation(self):
"""
停止真空操作线程
安全地停止后台运行线程并等待其完成。
"""
with self._thread_lock:
self._running = False
if self._vacuum_thread and self._vacuum_thread.is_alive():
self._vacuum_thread.join(timeout=2.0)
def _vacuum_operation_loop(self):
"""
真空操作主循环
这个方法在后台线程中运行,模拟真空泵的工作过程:
1. 检查电源状态和运行状态
2. 如果泵状态为 "Running",根据目标真空调整泵速和真空度
3. 否则等待
"""
while self._running and self._power_state == "On":
try:
with self._thread_lock:
# 只有泵状态为 Running 时才进行更新
if self._pump_state == "Running":
vacuum_diff = self._vacuum_level - self._target_vacuum
if abs(vacuum_diff) < 1.0: # 真空度接近目标值
self._status = "At Target Vacuum"
self._pump_speed = self._max_pump_speed * 0.2 # 维持真空的最小泵速
elif vacuum_diff > 0: # 需要抽真空(降低压力)
self._status = "Pumping Down"
if vacuum_diff > 500:
self._pump_speed = self._max_pump_speed
elif vacuum_diff > 100:
self._pump_speed = self._max_pump_speed * 0.8
elif vacuum_diff > 50:
self._pump_speed = self._max_pump_speed * 0.6
else:
self._pump_speed = self._max_pump_speed * 0.4
# 根据泵速和效率计算真空降幅
pump_rate = (self._pump_speed / self._max_pump_speed) * self._pump_efficiency / 100.0
vacuum_reduction = pump_rate * 10.0 # 每秒最大降低10 mbar
self._vacuum_level = max(self._target_vacuum, self._vacuum_level - vacuum_reduction)
else: # 目标真空度高于当前值,需要通气
self._status = "Venting"
self._pump_speed = 0.0
self._vacuum_level = min(self._target_vacuum, self._vacuum_level + 5.0)
# 限制真空度范围
self._vacuum_level = max(self._min_vacuum, min(self._max_vacuum, self._vacuum_level))
else:
# 当泵状态不是 Running 时,可保持原状态
self._status = "Vacuum Pump Not Running"
# 释放锁后等待1秒钟
time.sleep(1.0)
except Exception as e:
with self._thread_lock:
self._status = f"Error in vacuum operation: {str(e)}"
break
# 循环结束后的清理工作
if self._pump_state == "Running":
self._status = "Idle"
# 停止泵后,真空度逐渐回升到大气压
while self._vacuum_level < self._max_vacuum * 0.9:
with self._thread_lock:
self._vacuum_level += 2.0
time.sleep(0.1)
def get_status_info(self) -> dict:
"""
获取完整的设备状态信息
Returns:
dict: 包含所有设备状态的字典
"""
return {
"status": self._status,
"power_state": self._power_state,
"pump_state": self._pump_state,
"vacuum_level": self._vacuum_level,
"target_vacuum": self._target_vacuum,
"pump_speed": self._pump_speed,
"pump_efficiency": self._pump_efficiency,
"max_pump_speed": self._max_pump_speed
}
# 用于测试的主函数
if __name__ == "__main__":
vacuum = MockVacuum()
# 测试基本功能
print("启动真空泵测试...")
vacuum.power_control("On")
print(f"初始状态: {vacuum.get_status_info()}")
# 设置目标真空度并启动
vacuum.set_vacuum_level(10.0) # 设置为10mbar
vacuum.start_vacuum()
# 模拟运行15秒
for i in range(15):
time.sleep(1)
print(f"{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 泵速={vacuum.pump_speed:.1f}L/s, 状态={vacuum.status}")
# 测试通大气
print("测试通大气...")
vacuum.vent_to_atmosphere()
# 继续运行5秒观察通大气过程
for i in range(5):
time.sleep(1)
print(f"通大气第{i+1}秒: 真空度={vacuum.vacuum_level:.1f}mbar, 状态={vacuum.status}")
vacuum.emergency_stop()
print("测试完成")

View File

@@ -0,0 +1,92 @@
MockVacuum:
description: Mock Vacuum Pump Device
class:
module: unilabos.devices.Mock.MockVacuum.MockVacuum:MockVacuum
type: python
status_types:
status: String
power_state: String
pump_state: String
vacuum_level: Float64
target_vacuum: Float64
pump_speed: Float64
pump_efficiency: Float64
max_pump_speed: Float64
action_value_mappings:
power_control:
type: SendCmd
goal:
command: power_state
feedback: {}
result:
success: success
set_vacuum_level:
type: SendCmd
goal:
command: vacuum_level
feedback: {}
result:
success: success
start_vacuum:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
stop_vacuum:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
pause_vacuum:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
resume_vacuum:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
vent_to_atmosphere:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
schema:
type: object
properties:
status:
type: string
description: Current status of the vacuum pump
power_state:
type: string
description: Power state (On/Off)
pump_state:
type: string
description: Pump operation state (Running/Stopped/Paused)
vacuum_level:
type: number
description: Current vacuum level in mbar
target_vacuum:
type: number
description: Target vacuum level in mbar
pump_speed:
type: number
description: Current pump speed in L/s
pump_efficiency:
type: number
description: Pump efficiency percentage
max_pump_speed:
type: number
description: Maximum pump speed in L/s
required:
- status
- power_state
- pump_state
- vacuum_level
additionalProperties: false