mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-07 23:45:10 +00:00
区分了虚拟仪器中的八通阀和电磁阀,添加了两个阀门的驱动
This commit is contained in:
111
unilabos/devices/virtual/virtual_multiway_valve.py
Normal file
111
unilabos/devices/virtual/virtual_multiway_valve.py
Normal file
@@ -0,0 +1,111 @@
|
||||
import time
|
||||
from typing import Union
|
||||
|
||||
|
||||
class VirtualMultiwayValve:
|
||||
"""
|
||||
虚拟八通阀门 - 可将一个输入切换到8个输出中的任意一个
|
||||
"""
|
||||
def __init__(self, port: str = "VIRTUAL", positions: int = 8):
|
||||
self.port = port
|
||||
self.max_positions = positions
|
||||
|
||||
# 状态属性
|
||||
self._status = "Idle"
|
||||
self._valve_state = "Ready"
|
||||
self._current_position = 1
|
||||
self._target_position = 1
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
return self._status
|
||||
|
||||
@property
|
||||
def valve_state(self) -> str:
|
||||
return self._valve_state
|
||||
|
||||
@property
|
||||
def current_position(self) -> int:
|
||||
return self._current_position
|
||||
|
||||
@property
|
||||
def target_position(self) -> int:
|
||||
return self._target_position
|
||||
|
||||
def get_current_position(self) -> int:
|
||||
"""获取当前阀门位置"""
|
||||
return self._current_position
|
||||
|
||||
def set_position(self, position: Union[int, str]):
|
||||
"""
|
||||
设置阀门位置
|
||||
|
||||
Args:
|
||||
position: 目标位置 (1-8)
|
||||
"""
|
||||
try:
|
||||
pos = int(position)
|
||||
if pos < 1 or pos > self.max_positions:
|
||||
raise ValueError(f"Position must be between 1 and {self.max_positions}")
|
||||
|
||||
self._status = "Busy"
|
||||
self._valve_state = "Moving"
|
||||
self._target_position = pos
|
||||
|
||||
# 模拟阀门切换时间
|
||||
switch_time = abs(self._current_position - pos) * 0.5 # 每个位置0.5秒
|
||||
time.sleep(switch_time)
|
||||
|
||||
self._current_position = pos
|
||||
self._status = "Idle"
|
||||
self._valve_state = "Ready"
|
||||
|
||||
return f"Position set to {pos}"
|
||||
|
||||
except ValueError as e:
|
||||
self._status = "Error"
|
||||
self._valve_state = "Error"
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
def open(self):
|
||||
"""打开阀门 - 对于多通阀门,相当于设置到位置1"""
|
||||
return self.set_position(1)
|
||||
|
||||
def close(self):
|
||||
"""关闭阀门 - 对于多通阀门,相当于设置到关闭位置"""
|
||||
self._status = "Busy"
|
||||
self._valve_state = "Closing"
|
||||
time.sleep(0.5)
|
||||
|
||||
self._current_position = 0 # 0表示关闭状态
|
||||
self._status = "Idle"
|
||||
self._valve_state = "Closed"
|
||||
|
||||
return "Valve closed"
|
||||
|
||||
def get_valve_position(self) -> int:
|
||||
"""获取阀门位置 - 兼容性方法"""
|
||||
return self._current_position
|
||||
|
||||
def is_at_position(self, position: int) -> bool:
|
||||
"""检查是否在指定位置"""
|
||||
return self._current_position == position
|
||||
|
||||
def get_available_positions(self) -> list:
|
||||
"""获取可用位置列表"""
|
||||
return list(range(1, self.max_positions + 1))
|
||||
|
||||
def reset(self):
|
||||
"""重置阀门到初始位置"""
|
||||
return self.set_position(1)
|
||||
|
||||
def get_info(self) -> dict:
|
||||
"""获取阀门信息"""
|
||||
return {
|
||||
"port": self.port,
|
||||
"max_positions": self.max_positions,
|
||||
"current_position": self._current_position,
|
||||
"target_position": self._target_position,
|
||||
"status": self._status,
|
||||
"valve_state": self._valve_state
|
||||
}
|
||||
151
unilabos/devices/virtual/virtual_solenoid_valve.py
Normal file
151
unilabos/devices/virtual/virtual_solenoid_valve.py
Normal file
@@ -0,0 +1,151 @@
|
||||
import time
|
||||
from typing import Union
|
||||
|
||||
|
||||
class VirtualSolenoidValve:
|
||||
"""
|
||||
虚拟电磁阀门 - 简单的开关型阀门,只有开启和关闭两个状态
|
||||
"""
|
||||
def __init__(self, port: str = "VIRTUAL", voltage: float = 12.0, response_time: float = 0.1):
|
||||
self.port = port
|
||||
self.voltage = voltage
|
||||
self.response_time = response_time
|
||||
|
||||
# 状态属性
|
||||
self._status = "Idle"
|
||||
self._valve_state = "Closed" # "Open" or "Closed"
|
||||
self._is_open = False
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
return self._status
|
||||
|
||||
@property
|
||||
def valve_state(self) -> str:
|
||||
return self._valve_state
|
||||
|
||||
@property
|
||||
def is_open(self) -> bool:
|
||||
return self._is_open
|
||||
|
||||
def get_valve_position(self) -> str:
|
||||
"""获取阀门位置状态"""
|
||||
return "OPEN" if self._is_open else "CLOSED"
|
||||
|
||||
def set_valve_position(self, position: Union[str, bool]):
|
||||
"""
|
||||
设置阀门位置
|
||||
|
||||
Args:
|
||||
position: "OPEN"/"CLOSED" 或 True/False
|
||||
"""
|
||||
self._status = "Busy"
|
||||
|
||||
# 模拟阀门响应时间
|
||||
time.sleep(self.response_time)
|
||||
|
||||
if isinstance(position, str):
|
||||
target_open = position.upper() == "OPEN"
|
||||
elif isinstance(position, bool):
|
||||
target_open = position
|
||||
else:
|
||||
self._status = "Error"
|
||||
return "Error: Invalid position"
|
||||
|
||||
self._is_open = target_open
|
||||
self._valve_state = "Open" if target_open else "Closed"
|
||||
self._status = "Idle"
|
||||
|
||||
return f"Valve {'opened' if target_open else 'closed'}"
|
||||
|
||||
def open(self):
|
||||
"""打开电磁阀"""
|
||||
self._status = "Busy"
|
||||
time.sleep(self.response_time)
|
||||
|
||||
self._is_open = True
|
||||
self._valve_state = "Open"
|
||||
self._status = "Idle"
|
||||
|
||||
return "Valve opened"
|
||||
|
||||
def close(self):
|
||||
"""关闭电磁阀"""
|
||||
self._status = "Busy"
|
||||
time.sleep(self.response_time)
|
||||
|
||||
self._is_open = False
|
||||
self._valve_state = "Closed"
|
||||
self._status = "Idle"
|
||||
|
||||
return "Valve closed"
|
||||
|
||||
def set_state(self, state: Union[bool, str]):
|
||||
"""
|
||||
设置阀门状态
|
||||
|
||||
Args:
|
||||
state: True/False 或 "open"/"close"
|
||||
"""
|
||||
if isinstance(state, bool):
|
||||
return self.open() if state else self.close()
|
||||
elif isinstance(state, str):
|
||||
if state.lower() in ["open", "on", "true", "1"]:
|
||||
return self.open()
|
||||
elif state.lower() in ["close", "closed", "off", "false", "0"]:
|
||||
return self.close()
|
||||
else:
|
||||
self._status = "Error"
|
||||
return "Error: Invalid state"
|
||||
else:
|
||||
self._status = "Error"
|
||||
return "Error: Invalid state type"
|
||||
|
||||
def toggle(self):
|
||||
"""切换阀门状态"""
|
||||
if self._is_open:
|
||||
return self.close()
|
||||
else:
|
||||
return self.open()
|
||||
|
||||
def is_closed(self) -> bool:
|
||||
"""检查阀门是否关闭"""
|
||||
return not self._is_open
|
||||
|
||||
def get_state(self) -> dict:
|
||||
"""获取阀门完整状态"""
|
||||
return {
|
||||
"port": self.port,
|
||||
"voltage": self.voltage,
|
||||
"response_time": self.response_time,
|
||||
"is_open": self._is_open,
|
||||
"valve_state": self._valve_state,
|
||||
"status": self._status,
|
||||
"position": self.get_valve_position()
|
||||
}
|
||||
|
||||
def reset(self):
|
||||
"""重置阀门到关闭状态"""
|
||||
return self.close()
|
||||
|
||||
def test_cycle(self, cycles: int = 3, delay: float = 1.0):
|
||||
"""
|
||||
测试阀门开关循环
|
||||
|
||||
Args:
|
||||
cycles: 循环次数
|
||||
delay: 每次开关间隔时间(秒)
|
||||
"""
|
||||
results = []
|
||||
for i in range(cycles):
|
||||
# 打开
|
||||
result_open = self.open()
|
||||
results.append(f"Cycle {i+1} - Open: {result_open}")
|
||||
time.sleep(delay)
|
||||
|
||||
# 关闭
|
||||
result_close = self.close()
|
||||
results.append(f"Cycle {i+1} - Close: {result_close}")
|
||||
time.sleep(delay)
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user