mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-07 23:45:10 +00:00
Match mock device with action
This commit is contained in:
@@ -3,25 +3,11 @@ import threading
|
||||
|
||||
|
||||
class MockStirrer:
|
||||
"""
|
||||
模拟搅拌器设备类
|
||||
|
||||
这个类模拟了一个实验室搅拌器的行为,包括搅拌速度控制、
|
||||
温度监测、加热控制等功能。参考了现有的 HeaterStirrer_DaLong 实现。
|
||||
"""
|
||||
|
||||
def __init__(self, port: str = "MOCK"):
|
||||
"""
|
||||
初始化MockStirrer实例
|
||||
|
||||
Args:
|
||||
port (str): 设备端口,默认为"MOCK"表示模拟设备
|
||||
"""
|
||||
self.port = port
|
||||
|
||||
# 设备基本状态属性
|
||||
self._status: str = "Idle" # 设备状态:Idle, Running, Error, Stopped
|
||||
self._power_state: str = "Off" # 电源状态:On, Off
|
||||
|
||||
# 搅拌相关属性
|
||||
self._stir_speed: float = 0.0 # 当前搅拌速度 (rpm)
|
||||
@@ -46,52 +32,18 @@ class MockStirrer:
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
"""
|
||||
设备状态 - 会被自动识别的设备属性
|
||||
|
||||
Returns:
|
||||
str: 当前设备状态 (Idle, Running, Error, Stopped)
|
||||
"""
|
||||
return self._status
|
||||
|
||||
@property
|
||||
def power_state(self) -> str:
|
||||
"""
|
||||
电源状态
|
||||
|
||||
Returns:
|
||||
str: 电源状态 (On, Off)
|
||||
"""
|
||||
return self._power_state
|
||||
|
||||
@property
|
||||
def stir_speed(self) -> float:
|
||||
"""
|
||||
当前搅拌速度
|
||||
|
||||
Returns:
|
||||
float: 当前搅拌速度 (rpm)
|
||||
"""
|
||||
return self._stir_speed
|
||||
|
||||
@property
|
||||
def target_stir_speed(self) -> float:
|
||||
"""
|
||||
目标搅拌速度
|
||||
|
||||
Returns:
|
||||
float: 目标搅拌速度 (rpm)
|
||||
"""
|
||||
return self._target_stir_speed
|
||||
|
||||
@property
|
||||
def stir_state(self) -> str:
|
||||
"""
|
||||
搅拌状态
|
||||
|
||||
Returns:
|
||||
str: 搅拌状态 (Running, Stopped)
|
||||
"""
|
||||
return self._stir_state
|
||||
|
||||
@property
|
||||
@@ -116,86 +68,26 @@ class MockStirrer:
|
||||
|
||||
@property
|
||||
def heating_state(self) -> str:
|
||||
"""
|
||||
加热状态
|
||||
|
||||
Returns:
|
||||
str: 加热状态 (On, Off)
|
||||
"""
|
||||
return self._heating_state
|
||||
|
||||
@property
|
||||
def heating_power(self) -> float:
|
||||
"""
|
||||
加热功率
|
||||
|
||||
Returns:
|
||||
float: 加热功率百分比 (0-100)
|
||||
"""
|
||||
return self._heating_power
|
||||
|
||||
@property
|
||||
def max_stir_speed(self) -> float:
|
||||
"""
|
||||
最大搅拌速度
|
||||
|
||||
Returns:
|
||||
float: 最大搅拌速度 (rpm)
|
||||
"""
|
||||
return self._max_stir_speed
|
||||
|
||||
@property
|
||||
def max_temperature(self) -> float:
|
||||
"""
|
||||
最大温度
|
||||
|
||||
Returns:
|
||||
float: 最大温度 (°C)
|
||||
"""
|
||||
return self._max_temperature
|
||||
|
||||
# ==================== 设备控制方法 ====================
|
||||
# 这些方法需要在注册表中添加,会作为ActionServer接受控制指令
|
||||
|
||||
def power_control(self, power_state: str = "On") -> str:
|
||||
"""
|
||||
电源控制方法
|
||||
|
||||
Args:
|
||||
power_state (str): 电源状态,可选值:"On", "Off"
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
if power_state not in ["On", "Off"]:
|
||||
self._status = "Error: Invalid power state"
|
||||
return "Error"
|
||||
|
||||
self._power_state = power_state
|
||||
|
||||
if power_state == "On":
|
||||
self._status = "Power On"
|
||||
self._start_operation()
|
||||
else:
|
||||
self._status = "Power Off"
|
||||
self.stop_all_operations()
|
||||
|
||||
return "Success"
|
||||
|
||||
def set_stir_speed(self, speed: float) -> str:
|
||||
"""
|
||||
设置搅拌速度
|
||||
|
||||
Args:
|
||||
speed (float): 目标搅拌速度 (rpm)
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
speed = float(speed) # 确保传入的速度是浮点数
|
||||
if self._power_state != "On":
|
||||
self._status = "Error: Power Off"
|
||||
return "Error"
|
||||
|
||||
if speed < 0 or speed > self._max_stir_speed:
|
||||
self._status = f"Error: Speed out of range (0-{self._max_stir_speed})"
|
||||
@@ -213,19 +105,7 @@ class MockStirrer:
|
||||
return "Success"
|
||||
|
||||
def set_temperature(self, temperature: float) -> str:
|
||||
"""
|
||||
设置目标温度
|
||||
|
||||
Args:
|
||||
temperature (float): 目标温度 (°C)
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
temperature = float(temperature) # 确保传入的温度是浮点数
|
||||
if self._power_state != "On":
|
||||
self._status = "Error: Power Off"
|
||||
return "Error"
|
||||
|
||||
if temperature < 0 or temperature > self._max_temperature:
|
||||
self._status = f"Error: Temperature out of range (0-{self._max_temperature})"
|
||||
@@ -237,15 +117,6 @@ class MockStirrer:
|
||||
return "Success"
|
||||
|
||||
def start_stirring(self) -> str:
|
||||
"""
|
||||
启动搅拌
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
if self._power_state != "On":
|
||||
self._status = "Error: Power Off"
|
||||
return "Error"
|
||||
|
||||
if self._target_stir_speed <= 0:
|
||||
self._status = "Error: No target speed set"
|
||||
@@ -256,30 +127,12 @@ class MockStirrer:
|
||||
return "Success"
|
||||
|
||||
def stop_stirring(self) -> str:
|
||||
"""
|
||||
停止搅拌
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
self._stir_state = "Stopped"
|
||||
self._target_stir_speed = 0.0
|
||||
self._status = "Stirring Stopped"
|
||||
return "Success"
|
||||
|
||||
def heating_control(self, heating_state: str = "On") -> str:
|
||||
"""
|
||||
加热控制
|
||||
|
||||
Args:
|
||||
heating_state (str): 加热状态,可选值:"On", "Off"
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
if self._power_state != "On":
|
||||
self._status = "Error: Power Off"
|
||||
return "Error"
|
||||
|
||||
if heating_state not in ["On", "Off"]:
|
||||
self._status = "Error: Invalid heating state"
|
||||
@@ -296,12 +149,6 @@ class MockStirrer:
|
||||
return "Success"
|
||||
|
||||
def stop_all_operations(self) -> str:
|
||||
"""
|
||||
停止所有操作
|
||||
|
||||
Returns:
|
||||
str: 操作结果状态 ("Success", "Error")
|
||||
"""
|
||||
self._stir_state = "Stopped"
|
||||
self._heating_state = "Off"
|
||||
self._stop_operation()
|
||||
@@ -325,13 +172,8 @@ class MockStirrer:
|
||||
# ==================== 内部控制方法 ====================
|
||||
|
||||
def _start_operation(self):
|
||||
"""
|
||||
启动操作线程
|
||||
|
||||
这个方法启动一个后台线程来模拟搅拌器的实际运行过程。
|
||||
"""
|
||||
with self._thread_lock:
|
||||
if not self._running and self._power_state == "On":
|
||||
if not self._running:
|
||||
self._running = True
|
||||
self._operation_thread = threading.Thread(target=self._operation_loop)
|
||||
self._operation_thread.daemon = True
|
||||
@@ -349,15 +191,7 @@ class MockStirrer:
|
||||
self._operation_thread.join(timeout=2.0)
|
||||
|
||||
def _operation_loop(self):
|
||||
"""
|
||||
操作主循环
|
||||
|
||||
这个方法在后台线程中运行,模拟真实搅拌器的工作过程:
|
||||
1. 搅拌速度控制
|
||||
2. 温度控制和加热
|
||||
3. 状态更新
|
||||
"""
|
||||
while self._running and self._power_state == "On":
|
||||
while self._running:
|
||||
try:
|
||||
# 处理搅拌速度控制
|
||||
if self._stir_state == "Running":
|
||||
@@ -431,19 +265,11 @@ class MockStirrer:
|
||||
break
|
||||
|
||||
# 循环结束时的清理工作
|
||||
if self._power_state == "On":
|
||||
self._status = "Idle"
|
||||
|
||||
def get_status_info(self) -> dict:
|
||||
"""
|
||||
获取完整的设备状态信息
|
||||
|
||||
Returns:
|
||||
dict: 包含所有设备状态的字典
|
||||
"""
|
||||
return {
|
||||
"status": self._status,
|
||||
"power_state": self._power_state,
|
||||
"stir_speed": self._stir_speed,
|
||||
"target_stir_speed": self._target_stir_speed,
|
||||
"stir_state": self._stir_state,
|
||||
@@ -462,7 +288,6 @@ if __name__ == "__main__":
|
||||
|
||||
# 测试基本功能
|
||||
print("启动搅拌器测试...")
|
||||
stirrer.power_control("On")
|
||||
print(f"初始状态: {stirrer.get_status_info()}")
|
||||
|
||||
# 设置搅拌速度和温度
|
||||
|
||||
Reference in New Issue
Block a user