Compare commits

...

4 Commits

Author SHA1 Message Date
KCFeng425
fe492bbca9 Add new action 2025-06-09 23:48:03 +08:00
KCFeng425
8c96be4229 Edit mock device yaml 2025-06-09 23:47:41 +08:00
KCFeng425
f947658bbd Match mock device with action 2025-06-09 23:47:23 +08:00
KCFeng425
6d56466897 Edited Mock device json 2025-06-09 23:46:37 +08:00
30 changed files with 1819 additions and 1055 deletions

View File

@@ -1,59 +1,155 @@
{
"nodes": [
{
"id": "MockStirrer1",
"name": "模拟搅拌器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_stirrer",
"position": {
"x": 100,
"y": 100,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"stir_speed": 0.0,
"target_stir_speed": 0.0,
"stir_state": "Stopped",
"temperature": 25.0,
"target_temperature": 25.0,
"heating_state": "Off",
"heating_power": 0.0,
"max_stir_speed": 2000.0,
"max_temperature": 300.0
}
{
"id": "MockChiller1",
"name": "模拟冷却器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_chiller",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"current_temperature": 25.0,
"target_temperature": 25.0,
"status": "Idle",
"is_cooling": false,
"is_heating": false,
"vessel": "",
"purpose": ""
}
},
{
"id": "MockVacuum1",
"name": "模拟真空泵",
"children": [],
"parent": null,
"type": "device",
"class": "mock_vacuum",
"position": {
"x": 300,
"y": 100,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
{
"id": "MockFilter1",
"name": "模拟过滤器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_filter",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"is_filtering": false,
"flow_rate": 0.0,
"filter_life": 100.0,
"vessel": "",
"filtrate_vessel": "",
"filtered_volume": 0.0,
"target_volume": 0.0,
"progress": 0.0,
"stir": false,
"stir_speed": 0.0,
"temperature": 25.0,
"continue_heatchill": false
}
},
{
"id": "MockHeater1",
"name": "模拟加热器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_heater",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"current_temperature": 25.0,
"target_temperature": 25.0,
"status": "Idle",
"is_heating": false,
"heating_power": 0.0,
"max_temperature": 300.0,
"vessel": "Unknown",
"purpose": "Unknown",
"stir": false,
"stir_speed": 0.0
}
},
{
"id": "MockPump1",
"name": "模拟泵设备",
"children": [],
"parent": null,
"type": "device",
"class": "mock_pump",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"pump_state": "Stopped",
"vacuum_level": 1013.25,
"target_vacuum": 50.0,
"pump_speed": 0.0,
"pump_efficiency": 95.0,
"max_pump_speed": 100.0
}
"current_device": "MockPump1",
"pump_state": "Stopped",
"flow_rate": 0.0,
"target_flow_rate": 0.0,
"pressure": 0.0,
"total_volume": 0.0,
"max_flow_rate": 100.0,
"max_pressure": 10.0,
"from_vessel": "",
"to_vessel": "",
"transfer_volume": 0.0,
"amount": "",
"transfer_time": 0.0,
"is_viscous": false,
"rinsing_solvent": "",
"rinsing_volume": 0.0,
"rinsing_repeats": 0,
"is_solid": false,
"time_spent": 0.0,
"time_remaining": 0.0
}
},
{
"id": "MockRotavap1",
"name": "模拟旋转蒸发器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_rotavap",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"rotate_state": "Stopped",
"rotate_time": 0.0,
"rotate_speed": 0.0,
"pump_state": "Stopped",
"pump_time": 0.0,
"vacuum_level": 1013.25,
"temperature": 25.0,
"target_temperature": 25.0,
"success": "True"
}
},
{
"id": "MockSeparator1",
@@ -63,8 +159,8 @@
"type": "device",
"class": "mock_separator",
"position": {
"x": 500,
"y": 100,
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
@@ -72,163 +168,128 @@
},
"data": {
"status": "Idle",
"power_state": "Off",
"settling_time": 0.0,
"valve_state": "Closed",
"shake_time": 0.0,
"shake_status": "Not Shaking"
"shake_status": "Not Shaking",
"current_device": "MockSeparator1",
"purpose": "",
"product_phase": "",
"from_vessel": "",
"separation_vessel": "",
"to_vessel": "",
"waste_phase_to_vessel": "",
"solvent": "",
"solvent_volume": 0.0,
"through": "",
"repeats": 1,
"stir_time": 0.0,
"stir_speed": 0.0,
"time_spent": 0.0,
"time_remaining": 0.0
}
},
{
"id": "MockSolenoidValve1",
"name": "模拟电磁阀",
"children": [],
"parent": null,
"type": "device",
"class": "mock_solenoid_valve",
"position": {
"x": 700,
"y": 100,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"valve_status": "Closed"
}
{
"id": "MockSolenoidValve1",
"name": "模拟电磁阀",
"children": [],
"parent": null,
"type": "device",
"class": "mock_solenoid_valve",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"valve_status": "Closed"
}
},
{
"id": "MockRotavap1",
"name": "模拟旋转蒸发器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_rotavap",
"position": {
"x": 100,
"y": 300,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
{
"id": "MockStirrer1NEW",
"name": "模拟搅拌器(new)",
"children": [],
"parent": null,
"type": "device",
"class": "mock_stirrer_new",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"rotate_state": "Stopped",
"rotate_time": 0.0,
"rotate_speed": 0.0,
"pump_state": "Stopped",
"pump_time": 0.0,
"vacuum_level": 1013.25,
"temperature": 25.0,
"target_temperature": 25.0,
"success": "True"
}
"vessel": "",
"purpose": "",
"stir_speed": 0.0,
"target_stir_speed": 0.0,
"stir_state": "Stopped",
"stir_time": 0.0,
"settling_time": 0.0,
"progress": 0.0,
"max_stir_speed": 2000.0
}
},
{
"id": "MockHeater1",
"name": "模拟加热器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_heater",
"position": {
"x": 300,
"y": 300,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"current_temperature": 25.0,
"target_temperature": 25.0,
"status": "Idle",
"power_on": false,
"is_heating": false,
"heating_power": 0.0,
"max_temperature": 300.0
}
{
"id": "MockStirrer1",
"name": "模拟搅拌器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_stirrer",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"stir_speed": 0.0,
"target_stir_speed": 0.0,
"stir_state": "Stopped",
"temperature": 25.0,
"target_temperature": 25.0,
"heating_state": "Off",
"heating_power": 0.0,
"max_stir_speed": 2000.0,
"max_temperature": 300.0
}
},
{
"id": "MockPump1",
"name": "模拟泵设备",
"children": [],
"parent": null,
"type": "device",
"class": "mock_pump",
"position": {
"x": 500,
"y": 300,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"pump_state": "Stopped",
"flow_rate": 0.0,
"target_flow_rate": 0.0,
"pressure": 0.0,
"total_volume": 0.0,
"direction": "Forward",
"max_flow_rate": 100.0,
"max_pressure": 10.0
}
},
{
"id": "MockChiller1",
"name": "模拟冷却器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_chiller",
"position": {
"x": 700,
"y": 300,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"current_temperature": 25.0,
"target_temperature": 25.0,
"status": "Idle",
"power_on": false,
"is_cooling": false,
"is_heating": false
}
},
{
"id": "MockFilter1",
"name": "模拟过滤器",
"children": [],
"parent": null,
"type": "device",
"class": "mock_filter",
"position": {
"x": 400,
"y": 500,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"is_filtering": false,
"filter_efficiency": 95.0,
"flow_rate": 0.0,
"pressure_drop": 0.0,
"filter_life": 100.0,
"power_on": false
}
{
"id": "MockVacuum1",
"name": "模拟真空泵",
"children": [],
"parent": null,
"type": "device",
"class": "mock_vacuum",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"pump_state": "Stopped",
"vacuum_level": 1013.25,
"target_vacuum": 50.0,
"pump_speed": 0.0,
"pump_efficiency": 95.0,
"max_pump_speed": 100.0
}
}
],
"links": []

View File

@@ -19,9 +19,10 @@
"current_temperature": 25.0,
"target_temperature": 25.0,
"status": "Idle",
"power_on": false,
"is_cooling": false,
"is_heating": false
"is_heating": false,
"vessel": "",
"purpose": ""
}
}
],

View File

@@ -18,11 +18,17 @@
"data": {
"status": "Idle",
"is_filtering": false,
"filter_efficiency": 95.0,
"flow_rate": 0.0,
"pressure_drop": 0.0,
"filter_life": 100.0,
"power_on": false
"flow_rate": 0.0,
"filter_life": 100.0,
"vessel": "",
"filtrate_vessel": "",
"filtered_volume": 0.0,
"target_volume": 0.0,
"progress": 0.0,
"stir": false,
"stir_speed": 0.0,
"temperature": 25.0,
"continue_heatchill": false
}
}
],

View File

@@ -19,10 +19,13 @@
"current_temperature": 25.0,
"target_temperature": 25.0,
"status": "Idle",
"power_on": false,
"is_heating": false,
"heating_power": 0.0,
"max_temperature": 300.0
"max_temperature": 300.0,
"vessel": "Unknown",
"purpose": "Unknown",
"stir": false,
"stir_speed": 0.0
}
}
],

View File

@@ -16,16 +16,27 @@
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"status": "Idle",
"current_device": "MockPump1",
"pump_state": "Stopped",
"flow_rate": 0.0,
"target_flow_rate": 0.0,
"pressure": 0.0,
"total_volume": 0.0,
"direction": "Forward",
"max_flow_rate": 100.0,
"max_pressure": 10.0
"max_pressure": 10.0,
"from_vessel": "",
"to_vessel": "",
"transfer_volume": 0.0,
"amount": "",
"transfer_time": 0.0,
"is_viscous": false,
"rinsing_solvent": "",
"rinsing_volume": 0.0,
"rinsing_repeats": 0,
"is_solid": false,
"time_spent": 0.0,
"time_remaining": 0.0
}
}
],

View File

@@ -17,7 +17,6 @@
},
"data": {
"status": "Idle",
"power_state": "Off",
"rotate_state": "Stopped",
"rotate_time": 0.0,
"rotate_speed": 0.0,

View File

@@ -17,11 +17,25 @@
},
"data": {
"status": "Idle",
"power_state": "Off",
"settling_time": 0.0,
"valve_state": "Closed",
"shake_time": 0.0,
"shake_status": "Not Shaking"
"shake_status": "Not Shaking",
"current_device": "MockSeparator1",
"purpose": "",
"product_phase": "",
"from_vessel": "",
"separation_vessel": "",
"to_vessel": "",
"waste_phase_to_vessel": "",
"solvent": "",
"solvent_volume": 0.0,
"through": "",
"repeats": 1,
"stir_time": 0.0,
"stir_speed": 0.0,
"time_spent": 0.0,
"time_remaining": 0.0
}
}
],

View File

@@ -16,8 +16,7 @@
"port": "MOCK"
},
"data": {
"status": "Idle",
"power_state": "Off",
"status": "Idle",
"stir_speed": 0.0,
"target_stir_speed": 0.0,
"stir_state": "Stopped",

View File

@@ -0,0 +1,33 @@
{
"nodes": [
{
"id": "MockStirrer1COPY",
"name": "模拟搅拌器(Copy)",
"children": [],
"parent": null,
"type": "device",
"class": "mock_stirrer_new",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"vessel": "",
"purpose": "",
"stir_speed": 0.0,
"target_stir_speed": 0.0,
"stir_state": "Stopped",
"stir_time": 0.0,
"settling_time": 0.0,
"progress": 0.0,
"max_stir_speed": 2000.0
}
}
],
"links": []
}

View File

@@ -10,11 +10,15 @@ class MockChiller:
self._status: str = "Idle"
self._is_cooling: bool = False
self._is_heating: bool = False
self._power_on: bool = False
self._vessel = "Unknown"
self._purpose = "Unknown"
# 模拟温度变化的线程
self._temperature_thread = None
self._running = False
self._running = True
self._temperature_thread = threading.Thread(target=self._temperature_control_loop)
self._temperature_thread.daemon = True
self._temperature_thread.start()
@property
def current_temperature(self) -> float:
@@ -31,11 +35,6 @@ class MockChiller:
"""设备状态 - 会被自动识别的设备属性"""
return self._status
@property
def power_on(self) -> bool:
"""电源状态"""
return self._power_on
@property
def is_cooling(self) -> bool:
"""是否正在冷却"""
@@ -46,17 +45,22 @@ class MockChiller:
"""是否正在加热"""
return self._is_heating
def set_temperature(self, temperature: float):
"""设置目标温度 - 需要在注册表添加的设备动作"""
if not self._power_on:
self._status = "Error: Power Off"
return False
@property
def vessel(self) -> str:
"""当前操作的容器名称"""
return self._vessel
# 将传入温度转换为 float并限制在允许范围内
temperature = float(temperature)
self._target_temperature = temperature
@property
def purpose(self) -> str:
"""当前操作目的"""
return self._purpose
def heat_chill_start(self, vessel: str, temp: float, purpose: str):
"""设置目标温度并记录容器和目的"""
self._vessel = str(vessel)
self._purpose = str(purpose)
self._target_temperature = float(temp)
# 立即更新状态
diff = self._target_temperature - self._current_temperature
if abs(diff) < 0.1:
self._status = "At Target Temperature"
@@ -71,31 +75,37 @@ class MockChiller:
self._is_heating = True
self._is_cooling = False
# 启动温度控制
self._start_temperature_control()
return True
def power_on_off(self, power_state: str):
"""开关机控制"""
if power_state == "on":
self._power_on = True
# 不在这里直接设置状态和加热/制冷标志
self._start_temperature_control()
else:
self._power_on = False
self._status = "Power Off"
self._stop_temperature_control()
self._is_cooling = False
self._is_heating = False
def heat_chill_stop(self, vessel: str):
"""停止加热/制冷"""
if vessel != self._vessel:
return {"success": False, "status": f"Wrong vessel: expected {self._vessel}, got {vessel}"}
# 停止温度控制线程,锁定当前温度
self._stop_temperature_control()
# 更新状态
self._status = "Stopped"
self._is_cooling = False
self._is_heating = False
# 重新启动线程但保持温度
self._running = True
self._temperature_thread = threading.Thread(target=self._temperature_control_loop)
self._temperature_thread.daemon = True
self._temperature_thread.start()
return {"success": True, "status": self._status}
def _start_temperature_control(self):
"""启动温度控制线程"""
if self._power_on: # 移除 not self._running 检查
self._running = True
if self._temperature_thread is None or not self._temperature_thread.is_alive():
self._temperature_thread = threading.Thread(target=self._temperature_control_loop)
self._temperature_thread.daemon = True
self._temperature_thread.start()
self._running = True
if self._temperature_thread is None or not self._temperature_thread.is_alive():
self._temperature_thread = threading.Thread(target=self._temperature_control_loop)
self._temperature_thread.daemon = True
self._temperature_thread.start()
def _stop_temperature_control(self):
"""停止温度控制"""
@@ -105,30 +115,30 @@ class MockChiller:
def _temperature_control_loop(self):
"""温度控制循环 - 模拟真实冷却器的温度变化"""
while self._running and self._power_on:
while self._running:
# 如果状态是 Stopped不改变温度
if self._status == "Stopped":
time.sleep(1.0)
continue
temp_diff = self._target_temperature - self._current_temperature
if abs(temp_diff) < 0.1: # 将判断范围从0.5改小到0.1
if abs(temp_diff) < 0.1:
self._status = "At Target Temperature"
self._is_cooling = False
self._is_heating = False
elif temp_diff < 0: # 需要冷却
elif temp_diff < 0:
self._status = "Cooling"
self._is_cooling = True
self._is_heating = False
# 模拟冷却过程每秒降低0.5度
self._current_temperature -= 0.5
else: # 需要加热
else:
self._status = "Heating"
self._is_heating = True
self._is_cooling = False
# 模拟加热过程每秒升高0.3度
self._current_temperature += 0.3
# 限制温度范围
self._current_temperature = max(-20.0, min(80.0, self._current_temperature))
time.sleep(1.0) # 每秒更新一次
time.sleep(1.0)
def emergency_stop(self):
"""紧急停止"""
@@ -143,9 +153,10 @@ class MockChiller:
"current_temperature": self._current_temperature,
"target_temperature": self._target_temperature,
"status": self._status,
"power_on": self._power_on,
"is_cooling": self._is_cooling,
"is_heating": self._is_heating,
"vessel": self._vessel,
"purpose": self._purpose,
}
@@ -155,12 +166,8 @@ if __name__ == "__main__":
# 测试基本功能
print("启动冷却器测试...")
chiller.power_on_off("on")
print(f"初始状态: {chiller.get_status_info()}")
# 设置目标温度为5度
chiller.set_temperature(5.0)
# 模拟运行10秒
for i in range(10):
time.sleep(1)

View File

@@ -4,64 +4,112 @@ import threading
class MockFilter:
def __init__(self, port: str = "MOCK"):
# 基本参数初始化
self.port = port
self._status: str = "Idle"
self._is_filtering: bool = False
self._filter_efficiency: float = 95.0 # 过滤效率百分比
self._flow_rate: float = 0.0 # 流速 L/min
self._pressure_drop: float = 0.0 # 压降 Pa
self._filter_life: float = 100.0 # 滤芯寿命百分比
self._power_on: bool = False
# 模拟过滤过程的线程
# 过滤性能参数
self._flow_rate: float = 1.0 # 流速(L/min)
self._pressure_drop: float = 0.0 # 压降(Pa)
self._filter_life: float = 100.0 # 滤芯寿命(%)
# 过滤操作参数
self._vessel: str = "" # 源容器
self._filtrate_vessel: str = "" # 目标容器
self._stir: bool = False # 是否搅拌
self._stir_speed: float = 0.0 # 搅拌速度
self._temperature: float = 25.0 # 温度(℃)
self._continue_heatchill: bool = False # 是否继续加热/制冷
self._target_volume: float = 0.0 # 目标过滤体积(L)
self._filtered_volume: float = 0.0 # 已过滤体积(L)
self._progress: float = 0.0 # 过滤进度(%)
# 线程控制
self._filter_thread = None
self._running = False
@property
def status(self) -> str:
"""设备状态 - 会被自动识别的设备属性"""
return self._status
@property
def is_filtering(self) -> bool:
"""是否正在过滤"""
return self._is_filtering
@property
def filter_efficiency(self) -> float:
"""过滤效率"""
return self._filter_efficiency
@property
def flow_rate(self) -> float:
"""流速"""
return self._flow_rate
@property
def pressure_drop(self) -> float:
"""压降"""
return self._pressure_drop
@property
def filter_life(self) -> float:
"""滤芯寿命"""
return self._filter_life
# 新增 property
@property
def vessel(self) -> str:
return self._vessel
@property
def power_on(self) -> bool:
"""电源状态"""
return self._power_on
def filtrate_vessel(self) -> str:
return self._filtrate_vessel
def start_filtering(self, flow_rate: float = 1.0):
"""开始过滤 - 需要在注册表添加的设备动作"""
if not self._power_on:
self._status = "Error: Power Off"
return False
@property
def filtered_volume(self) -> float:
return self._filtered_volume
self._flow_rate = flow_rate
@property
def progress(self) -> float:
return self._progress
@property
def stir(self) -> bool:
return self._stir
@property
def stir_speed(self) -> float:
return self._stir_speed
@property
def temperature(self) -> float:
return self._temperature
@property
def continue_heatchill(self) -> bool:
return self._continue_heatchill
@property
def target_volume(self) -> float:
return self._target_volume
def filter(self, vessel: str, filtrate_vessel: str, stir: bool = False, stir_speed: float = 0.0, temp: float = 25.0, continue_heatchill: bool = False, volume: float = 0.0) -> dict:
"""新的过滤操作"""
# 停止任何正在进行的过滤
if self._is_filtering:
self.stop_filtering()
# 验证参数
if volume <= 0:
return {"success": False, "message": "Target volume must be greater than 0"}
# 设置新的过滤参数
self._vessel = vessel
self._filtrate_vessel = filtrate_vessel
self._stir = stir
self._stir_speed = stir_speed
self._temperature = temp
self._continue_heatchill = continue_heatchill
self._target_volume = volume
# 重置过滤状态
self._filtered_volume = 0.0
self._progress = 0.0
self._status = "Starting Filter"
# 启动过滤过程
self._flow_rate = 1.0 # 设置默认流速
self._start_filter_process()
return True
return {"success": True, "message": "Filter started"}
def stop_filtering(self):
"""停止过滤"""
@@ -69,31 +117,18 @@ class MockFilter:
self._stop_filter_process()
self._flow_rate = 0.0
self._is_filtering = False
self._status = "Idle"
self._status = "Stopped"
return True
def power_on_off(self, power_state: str):
"""开关机控制"""
if power_state == "on":
self._power_on = True
self._status = "Power On"
else:
self._power_on = False
self._status = "Power Off"
self._stop_filter_process()
self._is_filtering = False
self._flow_rate = 0.0
def replace_filter(self):
"""更换滤芯"""
self._filter_life = 100.0
self._filter_efficiency = 95.0
self._status = "Filter Replaced"
return True
def _start_filter_process(self):
"""启动过滤过程线程"""
if not self._running and self._power_on:
if not self._running:
self._running = True
self._is_filtering = True
self._filter_thread = threading.Thread(target=self._filter_loop)
@@ -107,24 +142,50 @@ class MockFilter:
self._filter_thread.join(timeout=1.0)
def _filter_loop(self):
"""过滤过程循环 - 模拟真实过滤器的工作过程"""
while self._running and self._power_on and self._is_filtering:
self._status = "Filtering"
# 模拟滤芯磨损
if self._filter_life > 0:
self._filter_life -= 0.1 # 每秒减少0.1%寿命
# 根据滤芯寿命调整效率和压降
life_factor = self._filter_life / 100.0
self._filter_efficiency = 95.0 * life_factor + 50.0 * (1 - life_factor)
self._pressure_drop = 100.0 + (200.0 * (1 - life_factor)) # 压降随磨损增加
# 检查滤芯是否需要更换
if self._filter_life <= 10.0:
self._status = "Filter Needs Replacement"
time.sleep(1.0) # 每秒更新一次
"""过滤进程主循环"""
update_interval = 1.0 # 更新间隔(秒)
while self._running and self._is_filtering:
try:
self._status = "Filtering"
# 计算这一秒过滤的体积 (L/min -> L/s)
volume_increment = (self._flow_rate / 60.0) * update_interval
# 更新已过滤体积
self._filtered_volume += volume_increment
# 更新进度 (避免除零错误)
if self._target_volume > 0:
self._progress = min(100.0, (self._filtered_volume / self._target_volume) * 100.0)
# 更新滤芯寿命 (每过滤1L减少0.5%寿命)
self._filter_life = max(0.0, self._filter_life - (volume_increment * 0.5))
# 更新压降 (根据滤芯寿命和流速动态计算)
life_factor = self._filter_life / 100.0 # 将寿命转换为0-1的因子
flow_factor = self._flow_rate / 2.0 # 将流速标准化(假设2L/min是标准流速)
base_pressure = 100.0 # 基础压降
# 压降随滤芯寿命降低而增加,随流速增加而增加
self._pressure_drop = base_pressure * (2 - life_factor) * flow_factor
# 检查是否完成目标体积
if self._target_volume > 0 and self._filtered_volume >= self._target_volume:
self._status = "Completed"
self._progress = 100.0
self.stop_filtering()
break
# 检查滤芯寿命
if self._filter_life <= 10.0:
self._status = "Filter Needs Replacement"
time.sleep(update_interval)
except Exception as e:
print(f"Error in filter loop: {e}")
self.emergency_stop()
break
def emergency_stop(self):
"""紧急停止"""
@@ -134,15 +195,21 @@ class MockFilter:
self._flow_rate = 0.0
def get_status_info(self) -> dict:
"""获取完整状态信息"""
"""扩展的状态信息"""
return {
"status": self._status,
"is_filtering": self._is_filtering,
"filter_efficiency": self._filter_efficiency,
"flow_rate": self._flow_rate,
"pressure_drop": self._pressure_drop,
"filter_life": self._filter_life,
"power_on": self._power_on,
"vessel": self._vessel,
"filtrate_vessel": self._filtrate_vessel,
"filtered_volume": self._filtered_volume,
"target_volume": self._target_volume,
"progress": self._progress,
"temperature": self._temperature,
"stir": self._stir,
"stir_speed": self._stir_speed
}
@@ -152,17 +219,15 @@ if __name__ == "__main__":
# 测试基本功能
print("启动过滤器测试...")
filter_device.power_on_off("on")
print(f"初始状态: {filter_device.get_status_info()}")
# 开始过滤
filter_device.start_filtering(2.0)
# 模拟运行10秒
for i in range(10):
time.sleep(1)
print(
f"{i+1}秒: 效率={filter_device.filter_efficiency:.1f}%, "
f"{i+1}秒: "
f"寿命={filter_device.filter_life:.1f}%, 状态={filter_device.status}"
)

View File

@@ -1,7 +1,6 @@
import time
import threading
class MockHeater:
def __init__(self, port: str = "MOCK"):
self.port = port
@@ -9,13 +8,21 @@ class MockHeater:
self._target_temperature: float = 25.0
self._status: str = "Idle"
self._is_heating: bool = False
self._power_on: bool = False
self._heating_power: float = 0.0 # 加热功率百分比 0-100
self._max_temperature: float = 300.0 # 最大加热温度
# 新增加的属性
self._vessel: str = "Unknown"
self._purpose: str = "Unknown"
self._stir: bool = False
self._stir_speed: float = 0.0
# 模拟加热过程的线程
self._heating_thread = None
self._running = False
self._running = True
self._heating_thread = threading.Thread(target=self._heating_control_loop)
self._heating_thread.daemon = True
self._heating_thread.start()
@property
def current_temperature(self) -> float:
@@ -32,11 +39,6 @@ class MockHeater:
"""设备状态 - 会被自动识别的设备属性"""
return self._status
@property
def power_on(self) -> bool:
"""电源状态"""
return self._power_on
@property
def is_heating(self) -> bool:
"""是否正在加热"""
@@ -51,6 +53,79 @@ class MockHeater:
def max_temperature(self) -> float:
"""最大加热温度"""
return self._max_temperature
@property
def vessel(self) -> str:
"""当前操作的容器名称"""
return self._vessel
@property
def purpose(self) -> str:
"""操作目的"""
return self._purpose
@property
def stir(self) -> bool:
"""是否搅拌"""
return self._stir
@property
def stir_speed(self) -> float:
"""搅拌速度"""
return self._stir_speed
def heat_chill_start(self, vessel: str, temp: float, purpose: str) -> dict:
"""开始加热/制冷过程"""
self._vessel = str(vessel)
self._purpose = str(purpose)
self._target_temperature = float(temp)
diff = self._target_temperature - self._current_temperature
if abs(diff) < 0.1:
self._status = "At Target Temperature"
self._is_heating = False
elif diff > 0:
self._status = "Heating"
self._is_heating = True
else:
self._status = "Cooling Down"
self._is_heating = False
return {"success": True, "status": self._status}
def heat_chill_stop(self, vessel: str) -> dict:
"""停止加热/制冷"""
if vessel != self._vessel:
return {"success": False, "status": f"Wrong vessel: expected {self._vessel}, got {vessel}"}
self._status = "Stopped"
self._is_heating = False
self._heating_power = 0.0
return {"success": True, "status": self._status}
def heat_chill(self, vessel: str, temp: float, time: float,
stir: bool = False, stir_speed: float = 0.0,
purpose: str = "Unknown") -> dict:
"""完整的加热/制冷控制"""
self._vessel = str(vessel)
self._target_temperature = float(temp)
self._purpose = str(purpose)
self._stir = stir
self._stir_speed = stir_speed
diff = self._target_temperature - self._current_temperature
if abs(diff) < 0.1:
self._status = "At Target Temperature"
self._is_heating = False
elif diff > 0:
self._status = "Heating"
self._is_heating = True
else:
self._status = "Cooling Down"
self._is_heating = False
return {"success": True, "status": self._status}
def set_temperature(self, temperature: float):
"""设置目标温度 - 需要在注册表添加的设备动作"""
@@ -60,10 +135,6 @@ class MockHeater:
self._status = "Error: Invalid temperature value"
return False
if not self._power_on:
self._status = "Error: Power Off"
return False
if temperature > self._max_temperature:
self._status = f"Error: Temperature exceeds maximum ({self._max_temperature}°C)"
return False
@@ -83,33 +154,12 @@ class MockHeater:
self._status = "Error: Invalid power value"
return False
if not self._power_on:
self._status = "Error: Power Off"
return False
self._heating_power = max(0.0, min(100.0, power)) # 限制在0-100%
return True
def power_on_off(self, power_state: str):
"""开关机控制,接收字符串命令 "On""Off" """
power_state = power_state.capitalize()
if power_state not in ["On", "Off"]:
self._status = "Error: Invalid power state"
return "Error"
self._power_on = True if power_state == "On" else False
if self._power_on:
self._status = "Power On"
self._start_heating_control()
else:
self._status = "Power Off"
self._stop_heating_control()
self._is_heating = False
self._heating_power = 0.0
return "Success"
def _start_heating_control(self):
"""启动加热控制线程"""
if not self._running and self._power_on:
if not self._running:
self._running = True
self._heating_thread = threading.Thread(target=self._heating_control_loop)
self._heating_thread.daemon = True
@@ -122,41 +172,31 @@ class MockHeater:
self._heating_thread.join(timeout=1.0)
def _heating_control_loop(self):
"""加热控制循环 - 模拟真实加热器的温度变化"""
while self._running and self._power_on:
"""加热控制循环"""
while self._running:
# 如果状态是 Stopped不改变温度
if self._status == "Stopped":
time.sleep(1.0)
continue
temp_diff = self._target_temperature - self._current_temperature
if abs(temp_diff) < 0.5: # 温度接近目标值
if abs(temp_diff) < 0.1:
self._status = "At Target Temperature"
self._is_heating = False
self._heating_power = 10.0 # 维持温度的最小功率
elif temp_diff > 0: # 需要加热
self._heating_power = 10.0
elif temp_diff > 0:
self._status = "Heating"
self._is_heating = True
# 根据温差调整加热功率
if temp_diff > 50:
self._heating_power = 100.0
elif temp_diff > 20:
self._heating_power = 80.0
elif temp_diff > 10:
self._heating_power = 60.0
else:
self._heating_power = 40.0
# 模拟加热过程,加热速度与功率成正比
heating_rate = self._heating_power / 100.0 * 2.0 # 最大每秒升温2度
self._current_temperature += heating_rate
else: # 目标温度低于当前温度,自然冷却
self._heating_power = min(100.0, abs(temp_diff) * 2)
self._current_temperature += 0.5
else:
self._status = "Cooling Down"
self._is_heating = False
self._heating_power = 0.0
# 模拟自然冷却每秒降低0.2度
self._current_temperature -= 0.2
# 限制温度范围
self._current_temperature = max(20.0, min(self._max_temperature, self._current_temperature))
time.sleep(1.0) # 每秒更新一次
time.sleep(1.0)
def emergency_stop(self):
"""紧急停止"""
@@ -171,32 +211,37 @@ class MockHeater:
"current_temperature": self._current_temperature,
"target_temperature": self._target_temperature,
"status": self._status,
"power_on": self._power_on,
"is_heating": self._is_heating,
"heating_power": self._heating_power,
"max_temperature": self._max_temperature,
"vessel": self._vessel,
"purpose": self._purpose,
"stir": self._stir,
"stir_speed": self._stir_speed
}
# 用于测试的主函数
if __name__ == "__main__":
heater = MockHeater()
# 测试基本功能
print("启动加热器测试...")
heater.power_on_off("On")
print(f"初始状态: {heater.get_status_info()}")
# 设置目标温度为80度
heater.set_temperature(80.0)
# 模拟运行15秒
for i in range(15):
time.sleep(1)
print(
f"{i+1}秒: 当前温度={heater.current_temperature:.1f}°C, 功率={heater.heating_power:.1f}%, "
f"状态={heater.status}"
)
try:
for i in range(15):
time.sleep(1)
status = heater.get_status_info()
print(
f"\r温度: {status['current_temperature']:.1f}°C / {status['target_temperature']:.1f}°C | "
f"功率: {status['heating_power']:.1f}% | 状态: {status['status']}",
end=""
)
except KeyboardInterrupt:
heater.emergency_stop()
print("\n测试被手动停止")
heater.emergency_stop()
print("测试完成")
print("\n测试完成")

View File

@@ -1,28 +1,14 @@
import time
import threading
from datetime import datetime, timedelta
class MockPump:
"""
模拟泵设备类
这个类模拟了一个实验室泵设备的行为,包括流量控制、压力监测、
运行状态管理等功能。所有的控制参数都使用字符串类型以提供更好的
可读性和扩展性。
"""
def __init__(self, port: str = "MOCK"):
"""
初始化MockPump实例
Args:
port (str): 设备端口,默认为"MOCK"表示模拟设备
"""
self.port = port
# 设备基本状态属性
self._current_device = "MockPump1" # 设备标识符
self._status: str = "Idle" # 设备状态Idle, Running, Error, Stopped
self._power_state: str = "Off" # 电源状态On, Off
self._pump_state: str = "Stopped" # 泵运行状态Running, Stopped, Paused
# 流量相关属性
@@ -35,24 +21,39 @@ class MockPump:
self._pressure: float = 0.0 # 当前压力 (bar)
self._max_pressure: float = 10.0 # 最大压力 (bar)
# 方向控制属性
self._direction: str = "Forward" # 泵方向Forward, Reverse
# 运行控制线程
self._pump_thread = None
self._running = False
self._thread_lock = threading.Lock()
# 新增 PumpTransfer 相关属性
self._from_vessel: str = ""
self._to_vessel: str = ""
self._transfer_volume: float = 0.0
self._amount: str = ""
self._transfer_time: float = 0.0
self._is_viscous: bool = False
self._rinsing_solvent: str = ""
self._rinsing_volume: float = 0.0
self._rinsing_repeats: int = 0
self._is_solid: bool = False
# 时间追踪
self._start_time: datetime = None
self._time_spent: timedelta = timedelta()
self._time_remaining: timedelta = timedelta()
# ==================== 状态属性 ====================
# 这些属性会被Uni-Lab系统自动识别并定时对外广播
@property
def status(self) -> str:
return self._status
@property
def power_state(self) -> str:
return self._power_state
def current_device(self) -> str:
"""当前设备标识符"""
return self._current_device
@property
def pump_state(self) -> str:
@@ -74,10 +75,6 @@ class MockPump:
def total_volume(self) -> float:
return self._total_volume
@property
def direction(self) -> str:
return self._direction
@property
def max_flow_rate(self) -> float:
return self._max_flow_rate
@@ -85,115 +82,121 @@ class MockPump:
@property
def max_pressure(self) -> float:
return self._max_pressure
# 添加新的属性访问器
@property
def from_vessel(self) -> str:
return self._from_vessel
@property
def to_vessel(self) -> str:
return self._to_vessel
@property
def transfer_volume(self) -> float:
return self._transfer_volume
@property
def amount(self) -> str:
return self._amount
@property
def transfer_time(self) -> float:
return self._transfer_time
@property
def is_viscous(self) -> bool:
return self._is_viscous
@property
def rinsing_solvent(self) -> str:
return self._rinsing_solvent
@property
def rinsing_volume(self) -> float:
return self._rinsing_volume
@property
def rinsing_repeats(self) -> int:
return self._rinsing_repeats
@property
def is_solid(self) -> bool:
return self._is_solid
# 修改这两个属性装饰器
@property
def time_spent(self) -> float:
"""已用时间(秒)"""
if isinstance(self._time_spent, timedelta):
return self._time_spent.total_seconds()
return float(self._time_spent)
@property
def time_remaining(self) -> float:
"""剩余时间(秒)"""
if isinstance(self._time_remaining, timedelta):
return self._time_remaining.total_seconds()
return float(self._time_remaining)
# ==================== 设备控制方法 ====================
# 这些方法需要在注册表中添加会作为ActionServer接受控制指令
def power_control(self, power_state: str = "On") -> str:
"""
电源控制方法
Args:
power_state (str): 电源状态,可选值:"On", "Off"
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if power_state not in ["On", "Off"]:
self._status = "Error: Invalid power state"
return "Error"
self._power_state = power_state
if power_state == "On":
self._status = "Power On"
else:
self._status = "Power Off"
# 关机时停止所有运行
self.stop_pump()
return "Success"
def set_flow_rate(self, flow_rate: float) -> str:
"""
设置目标流速
Args:
flow_rate (float): 目标流速 (mL/min)
Returns:
str: 操作结果状态 ("Success", "Error")
"""
try:
flow_rate = float(flow_rate)
except ValueError:
self._status = "Error: Invalid flow rate"
return "Error"
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if flow_rate < 0 or flow_rate > self._max_flow_rate:
self._status = f"Error: Flow rate out of range (0-{self._max_flow_rate})"
return "Error"
self._target_flow_rate = flow_rate
self._status = "Setting Flow Rate"
# 如果设置了非零流速,自动启动泵
if flow_rate > 0:
# 自动切换泵状态为 "Running" 以触发操作循环
self._pump_state = "Running"
self._start_pump_operation()
else:
self.stop_pump()
return "Success"
def start_pump(self) -> str:
"""
启动泵运行
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if self._target_flow_rate <= 0:
self._status = "Error: No target flow rate set"
return "Error"
self._pump_state = "Running"
self._status = "Starting Pump"
self._start_pump_operation()
return "Success"
def stop_pump(self) -> str:
"""
停止泵运行
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._pump_state = "Stopped"
self._status = "Stopping Pump"
def pump_transfer(self, from_vessel: str, to_vessel: str, volume: float,
amount: str = "", time: float = 0.0, viscous: bool = False,
rinsing_solvent: str = "", rinsing_volume: float = 0.0,
rinsing_repeats: int = 0, solid: bool = False) -> dict:
"""Execute pump transfer operation"""
# Stop any existing operation first
self._stop_pump_operation()
self._flow_rate = 0.0
self._pressure = 0.0
# Set transfer parameters
self._from_vessel = from_vessel
self._to_vessel = to_vessel
self._transfer_volume = float(volume)
self._amount = amount
self._transfer_time = float(time)
self._is_viscous = viscous
self._rinsing_solvent = rinsing_solvent
self._rinsing_volume = float(rinsing_volume)
self._rinsing_repeats = int(rinsing_repeats)
self._is_solid = solid
return "Success"
# Calculate flow rate
if self._transfer_time > 0 and self._transfer_volume > 0:
self._target_flow_rate = (self._transfer_volume / self._transfer_time) * 60.0
else:
self._target_flow_rate = 10.0 if not self._is_viscous else 5.0
# Reset timers and counters
self._start_time = datetime.now()
self._time_spent = timedelta()
self._time_remaining = timedelta(seconds=self._transfer_time)
self._total_volume = 0.0
self._flow_rate = 0.0
# Start pump operation
self._pump_state = "Running"
self._status = "Starting Transfer"
self._running = True
# Start pump operation thread
self._pump_thread = threading.Thread(target=self._pump_operation_loop)
self._pump_thread.daemon = True
self._pump_thread.start()
# Wait briefly to ensure thread starts
time.sleep(0.1)
return {
"success": True,
"status": self._status,
"current_device": self._current_device,
"time_spent": 0.0,
"time_remaining": float(self._transfer_time)
}
def pause_pump(self) -> str:
"""
暂停泵运行
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._pump_state != "Running":
self._status = "Error: Pump not running"
return "Error"
@@ -205,73 +208,23 @@ class MockPump:
return "Success"
def resume_pump(self) -> str:
"""
恢复泵运行
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._pump_state != "Paused":
self._status = "Error: Pump not paused"
return "Error"
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
self._pump_state = "Running"
self._status = "Resuming Pump"
self._start_pump_operation()
return "Success"
def set_direction(self, direction: str = "Forward") -> str:
"""
设置泵方向
Args:
direction (str): 泵方向,可选值:"Forward", "Reverse"
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if direction not in ["Forward", "Reverse"]:
self._status = "Error: Invalid direction"
return "Error"
# 如果泵正在运行,需要先停止
was_running = self._pump_state == "Running"
if was_running:
self.stop_pump()
time.sleep(0.5) # 等待停止完成
self._direction = direction
self._status = f"Direction set to {direction}"
# 如果之前在运行,重新启动
if was_running:
self.start_pump()
return "Success"
def reset_volume_counter(self) -> str:
"""
重置累计流量计数器
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._total_volume = 0.0
self._status = "Volume counter reset"
return "Success"
def emergency_stop(self) -> str:
"""
紧急停止
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._status = "Emergency Stop"
self._pump_state = "Stopped"
self._stop_pump_operation()
@@ -284,84 +237,72 @@ class MockPump:
# ==================== 内部控制方法 ====================
def _start_pump_operation(self):
"""
启动泵运行线程
这个方法启动一个后台线程来模拟泵的实际运行过程,
包括流速控制、压力变化和累计流量计算。
"""
with self._thread_lock:
if not self._running and self._power_state == "On":
if not self._running:
self._running = True
self._pump_thread = threading.Thread(target=self._pump_operation_loop)
self._pump_thread.daemon = True
self._pump_thread.start()
def _stop_pump_operation(self):
"""
停止泵运行线程
安全地停止后台运行线程并等待其完成。
"""
with self._thread_lock:
self._running = False
if self._pump_thread and self._pump_thread.is_alive():
self._pump_thread.join(timeout=2.0)
def _pump_operation_loop(self):
"""
泵运行主循环
这个方法在后台线程中运行,模拟真实泵的工作过程:
1. 逐步调整流速到目标值
2. 根据流速计算压力
3. 累计流量统计
4. 状态更新
"""
while self._running and self._power_state == "On" and self._pump_state == "Running":
"""泵运行主循环"""
print("Pump operation loop started") # Debug print
while self._running and self._pump_state == "Running":
try:
# 模拟流速调节过程(逐步接近目标流速)
# Calculate flow rate adjustment
flow_diff = self._target_flow_rate - self._flow_rate
if abs(flow_diff) < 0.1: # 流速接近目标值
self._flow_rate = self._target_flow_rate
self._status = "At Target Flow Rate"
# Adjust flow rate more aggressively (50% of difference)
adjustment = flow_diff * 0.5
self._flow_rate += adjustment
# Ensure flow rate is within bounds
self._flow_rate = max(0.1, min(self._max_flow_rate, self._flow_rate))
# Update status based on flow rate
if abs(flow_diff) < 0.1:
self._status = "Running at Target Flow Rate"
else:
# 模拟流速调节每秒调整10%的差值
adjustment = flow_diff * 0.1
self._flow_rate += adjustment
self._status = "Adjusting Flow Rate"
# 确保流速在合理范围内
self._flow_rate = max(0.0, min(self._max_flow_rate, self._flow_rate))
# Calculate volume increment
volume_increment = (self._flow_rate / 60.0) # mL/s
self._total_volume += volume_increment
# 模拟压力变化(压力与流速成正比,加上一些随机波动)
base_pressure = (self._flow_rate / self._max_flow_rate) * self._max_pressure
pressure_variation = 0.1 * base_pressure * (time.time() % 1.0 - 0.5) # ±5%波动
self._pressure = max(0.0, base_pressure + pressure_variation)
# Update time tracking
self._time_spent = datetime.now() - self._start_time
if self._transfer_time > 0:
remaining = self._transfer_time - self._time_spent.total_seconds()
self._time_remaining = timedelta(seconds=max(0, remaining))
# 累计流量计算(每秒更新)
if self._flow_rate > 0:
volume_increment = self._flow_rate / 60.0 # 转换为mL/s
if self._direction == "Reverse":
volume_increment = -volume_increment
self._total_volume += volume_increment
# Check completion
if self._total_volume >= self._transfer_volume:
self._status = "Transfer Completed"
self._pump_state = "Stopped"
self._running = False
break
# 压力保护检查
if self._pressure > self._max_pressure * 0.95:
self._status = "Warning: High Pressure"
# Update pressure
self._pressure = (self._flow_rate / self._max_flow_rate) * self._max_pressure
# 等待1秒后继续下一次循环
print(f"Debug - Flow: {self._flow_rate:.1f}, Volume: {self._total_volume:.1f}") # Debug print
time.sleep(1.0)
except Exception as e:
self._status = f"Error in pump operation: {str(e)}"
print(f"Error in pump operation: {str(e)}")
self._status = "Error in pump operation"
self._pump_state = "Stopped"
self._running = False
break
# 循环结束时的清理工作
if self._pump_state == "Running":
self._status = "Idle"
def get_status_info(self) -> dict:
"""
获取完整的设备状态信息
@@ -370,16 +311,27 @@ class MockPump:
dict: 包含所有设备状态的字典
"""
return {
"status": self._status,
"power_state": self._power_state,
"pump_state": self._pump_state,
"flow_rate": self._flow_rate,
"target_flow_rate": self._target_flow_rate,
"pressure": self._pressure,
"total_volume": self._total_volume,
"direction": self._direction,
"max_flow_rate": self._max_flow_rate,
"max_pressure": self._max_pressure,
"status": self._status,
"pump_state": self._pump_state,
"flow_rate": self._flow_rate,
"target_flow_rate": self._target_flow_rate,
"pressure": self._pressure,
"total_volume": self._total_volume,
"max_flow_rate": self._max_flow_rate,
"max_pressure": self._max_pressure,
"current_device": self._current_device,
"from_vessel": self._from_vessel,
"to_vessel": self._to_vessel,
"transfer_volume": self._transfer_volume,
"amount": self._amount,
"transfer_time": self._transfer_time,
"is_viscous": self._is_viscous,
"rinsing_solvent": self._rinsing_solvent,
"rinsing_volume": self._rinsing_volume,
"rinsing_repeats": self._rinsing_repeats,
"is_solid": self._is_solid,
"time_spent": self._time_spent.total_seconds(),
"time_remaining": self._time_remaining.total_seconds()
}
@@ -389,7 +341,6 @@ if __name__ == "__main__":
# 测试基本功能
print("启动泵设备测试...")
pump.power_control("On")
print(f"初始状态: {pump.get_status_info()}")
# 设置流速并启动
@@ -403,12 +354,7 @@ if __name__ == "__main__":
# 测试方向切换
print("切换泵方向...")
pump.set_direction("Reverse")
# 继续运行5秒
for i in range(5):
time.sleep(1)
print(f"反向第{i+1}秒: 累计流量={pump.total_volume:.1f}mL, 方向={pump.direction}")
pump.emergency_stop()
print("测试完成")

View File

@@ -22,7 +22,6 @@ class MockRotavap:
# 设备基本状态属性
self._status: str = "Idle" # 设备状态Idle, Running, Error, Stopped
self._power_state: str = "Off" # 电源状态On, Off
# 旋转相关属性
self._rotate_state: str = "Stopped" # 旋转状态Running, Stopped
@@ -56,10 +55,6 @@ class MockRotavap:
def status(self) -> str:
return self._status
@property
def power_state(self) -> str:
return self._power_state
@property
def rotate_state(self) -> str:
return self._rotate_state
@@ -95,33 +90,6 @@ class MockRotavap:
# ==================== 设备控制方法 ====================
# 这些方法需要在注册表中添加会作为ActionServer接受控制指令
def power_control(self, power_state: str = "On") -> str:
"""
电源控制方法
Args:
power_state (str): 电源状态,可选值:"On", "Off"
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if power_state not in ["On", "Off"]:
self._status = "Error: Invalid power state"
self.success = "False"
return "Error"
self._power_state = power_state
if power_state == "On":
self._status = "Power On"
self._start_operation()
else:
self._status = "Power Off"
self.stop_all_operations()
self.success = "True"
return "Success"
def set_timer(self, command: str) -> str:
"""
设置定时器 - 兼容现有RotavapOne接口
@@ -132,10 +100,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
self.success = "False"
return "Error"
try:
timer = json.loads(command)
@@ -165,9 +129,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
self.success = "False"
self._rotate_time = max(0.0, float(time_seconds))
@@ -185,9 +146,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
self.success = "False"
self._pump_time = max(0.0, float(time_seconds))
@@ -205,9 +163,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if speed < 0 or speed > self._max_rotate_speed:
self._status = f"Error: Speed out of range (0-{self._max_rotate_speed})"
@@ -227,9 +182,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if temperature < 0 or temperature > self._max_temperature:
self._status = f"Error: Temperature out of range (0-{self._max_temperature})"
@@ -237,6 +189,10 @@ class MockRotavap:
self._target_temperature = temperature
self._status = "Temperature set"
# 启动操作线程以开始温度控制
self._start_operation()
return "Success"
def start_rotation(self) -> str:
@@ -246,9 +202,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if self._rotate_time <= 0:
self._status = "Error: No rotate time set"
@@ -265,9 +218,6 @@ class MockRotavap:
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if self._pump_time <= 0:
self._status = "Error: No pump time set"
@@ -313,7 +263,7 @@ class MockRotavap:
这个方法启动一个后台线程来模拟旋蒸的实际运行过程。
"""
with self._thread_lock:
if not self._running and self._power_state == "On":
if not self._running:
self._running = True
self._operation_thread = threading.Thread(target=self._operation_loop)
self._operation_thread.daemon = True
@@ -340,7 +290,7 @@ class MockRotavap:
3. 真空度控制
4. 状态更新
"""
while self._running and self._power_state == "On":
while self._running:
try:
# 处理旋转时间倒计时
if self._rotate_time > 0:
@@ -385,7 +335,6 @@ class MockRotavap:
break
# 循环结束时的清理工作
if self._power_state == "On":
self._status = "Idle"
def get_status_info(self) -> dict:
@@ -397,7 +346,6 @@ class MockRotavap:
"""
return {
"status": self._status,
"power_state": self._power_state,
"rotate_state": self._rotate_state,
"rotate_time": self._rotate_time,
"rotate_speed": self._rotate_speed,
@@ -416,7 +364,6 @@ if __name__ == "__main__":
# 测试基本功能
print("启动旋转蒸发器测试...")
rotavap.power_control("On")
print(f"初始状态: {rotavap.get_status_info()}")
# 设置定时器

View File

@@ -1,13 +1,12 @@
import time
import threading
from datetime import datetime, timedelta
class MockSeparator:
def __init__(self, port: str = "MOCK"):
self.port = port
# 基本状态属性
self._power_state: str = "Off" # 电源On 或 Off
self._status: str = "Idle" # 当前总体状态
self._valve_state: str = "Closed" # 阀门状态Open 或 Closed
self._settling_time: float = 0.0 # 静置时间(秒)
@@ -19,10 +18,33 @@ class MockSeparator:
# 用于后台模拟 shake 动作
self._operation_thread = None
self._thread_lock = threading.Lock()
self._running = False
# Separate action 相关属性
self._current_device: str = "MockSeparator1"
self._purpose: str = "" # wash or extract
self._product_phase: str = "" # top or bottom
self._from_vessel: str = ""
self._separation_vessel: str = ""
self._to_vessel: str = ""
self._waste_phase_to_vessel: str = ""
self._solvent: str = ""
self._solvent_volume: float = 0.0
self._through: str = ""
self._repeats: int = 1
self._stir_time: float = 0.0
self._stir_speed: float = 0.0
self._time_spent = timedelta()
self._time_remaining = timedelta()
self._start_time = datetime.now() # 添加这一行
@property
def power_state(self) -> str:
return self._power_state
def current_device(self) -> str:
return self._current_device
@property
def purpose(self) -> str:
return self._purpose
@property
def valve_state(self) -> str:
@@ -45,22 +67,144 @@ class MockSeparator:
def shake_status(self) -> str:
with self._thread_lock:
return self._shake_status
@property
def product_phase(self) -> str:
return self._product_phase
def power_control(self, power_state: str) -> str:
"""
电源控制:只接受 "On""Off"
"""
if power_state not in ["On", "Off"]:
self._status = "Error: Invalid power state"
return "Error"
@property
def from_vessel(self) -> str:
return self._from_vessel
self._power_state = power_state
if power_state == "On":
self._status = "Powered On"
else:
self._status = "Powered Off"
self.stop_operations()
return "Success"
@property
def separation_vessel(self) -> str:
return self._separation_vessel
@property
def to_vessel(self) -> str:
return self._to_vessel
@property
def waste_phase_to_vessel(self) -> str:
return self._waste_phase_to_vessel
@property
def solvent(self) -> str:
return self._solvent
@property
def solvent_volume(self) -> float:
return self._solvent_volume
@property
def through(self) -> str:
return self._through
@property
def repeats(self) -> int:
return self._repeats
@property
def stir_time(self) -> float:
return self._stir_time
@property
def stir_speed(self) -> float:
return self._stir_speed
@property
def time_spent(self) -> float:
if self._running:
self._time_spent = datetime.now() - self._start_time
return self._time_spent.total_seconds()
@property
def time_remaining(self) -> float:
if self._running:
elapsed = (datetime.now() - self._start_time).total_seconds()
total_time = (self._stir_time + self._settling_time + 10) * self._repeats
remain = max(0, total_time - elapsed)
self._time_remaining = timedelta(seconds=remain)
return self._time_remaining.total_seconds()
def separate(self, purpose: str, product_phase: str, from_vessel: str,
separation_vessel: str, to_vessel: str, waste_phase_to_vessel: str = "",
solvent: str = "", solvent_volume: float = 0.0, through: str = "",
repeats: int = 1, stir_time: float = 0.0, stir_speed: float = 0.0,
settling_time: float = 60.0) -> dict:
"""
执行分离操作
"""
with self._thread_lock:
# 检查是否已经在运行
if self._running:
return {
"success": False,
"status": "Error: Operation already in progress"
}
# 必填参数验证
if not all([from_vessel, separation_vessel, to_vessel]):
self._status = "Error: Missing required vessel parameters"
return {"success": False}
# 验证参数
if purpose not in ["wash", "extract"]:
self._status = "Error: Invalid purpose"
return {"success": False}
if product_phase not in ["top", "bottom"]:
self._status = "Error: Invalid product phase"
return {"success": False}
# 数值参数验证
try:
solvent_volume = float(solvent_volume)
repeats = int(repeats)
stir_time = float(stir_time)
stir_speed = float(stir_speed)
settling_time = float(settling_time)
except ValueError:
self._status = "Error: Invalid numeric parameters"
return {"success": False}
# 设置参数
self._purpose = purpose
self._product_phase = product_phase
self._from_vessel = from_vessel
self._separation_vessel = separation_vessel
self._to_vessel = to_vessel
self._waste_phase_to_vessel = waste_phase_to_vessel
self._solvent = solvent
self._solvent_volume = float(solvent_volume)
self._through = through
self._repeats = int(repeats)
self._stir_time = float(stir_time)
self._stir_speed = float(stir_speed)
self._settling_time = float(settling_time)
# 重置计时器
self._start_time = datetime.now()
self._time_spent = timedelta()
total_time = (self._stir_time + self._settling_time + 10) * self._repeats
self._time_remaining = timedelta(seconds=total_time)
# 启动分离操作
self._status = "Starting Separation"
self._running = True
# 在锁内创建和启动线程
self._operation_thread = threading.Thread(target=self._operation_loop)
self._operation_thread.daemon = True
self._operation_thread.start()
# 等待确认操作已经开始
time.sleep(0.1) # 短暂等待确保操作线程已启动
return {
"success": True,
"status": self._status,
"current_device": self._current_device,
"time_spent": self._time_spent.total_seconds(),
"time_remaining": self._time_remaining.total_seconds()
}
def shake(self, shake_time: float) -> str:
"""
@@ -75,10 +219,6 @@ class MockSeparator:
self._status = "Error: Invalid shake time"
return "Error"
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
with self._thread_lock:
self._status = "Shaking"
self._settling_time = 0.0
@@ -115,9 +255,6 @@ class MockSeparator:
"""
阀门控制命令:传入 "open""close"
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
command = command.lower()
if command == "open":
@@ -130,30 +267,109 @@ class MockSeparator:
self._status = "Error: Invalid valve command"
return "Error"
return "Success"
def _operation_loop(self):
"""分离操作主循环"""
try:
current_repeat = 1
# 立即更新状态确保不会停留在Starting Separation
with self._thread_lock:
self._status = f"Separation Cycle {current_repeat}/{self._repeats}"
while self._running and current_repeat <= self._repeats:
# 第一步:搅拌
if self._stir_time > 0:
with self._thread_lock:
self._status = f"Stirring (Repeat {current_repeat}/{self._repeats})"
remaining_stir = self._stir_time
while remaining_stir > 0 and self._running:
time.sleep(1)
remaining_stir -= 1
# 第二步:静置
if self._settling_time > 0:
with self._thread_lock:
self._status = f"Settling (Repeat {current_repeat}/{self._repeats})"
remaining_settle = self._settling_time
while remaining_settle > 0 and self._running:
time.sleep(1)
remaining_settle -= 1
# 第三步:打开阀门排出
with self._thread_lock:
self._valve_state = "Open"
self._status = f"Draining (Repeat {current_repeat}/{self._repeats})"
# 模拟排出时间5秒
time.sleep(10)
# 关闭阀门
with self._thread_lock:
self._valve_state = "Closed"
# 检查是否继续下一次重复
if current_repeat < self._repeats:
current_repeat += 1
else:
with self._thread_lock:
self._status = "Separation Complete"
break
except Exception as e:
with self._thread_lock:
self._status = f"Error in separation: {str(e)}"
finally:
with self._thread_lock:
self._running = False
self._valve_state = "Closed"
if self._status == "Starting Separation":
self._status = "Error: Operation failed to start"
elif self._status != "Separation Complete":
self._status = "Stopped"
def stop_operations(self) -> str:
"""
停止任何正在执行的操作
"""
"""停止任何正在执行的操作"""
with self._thread_lock:
self._running = False
if self._operation_thread and self._operation_thread.is_alive():
self._operation_thread.join(timeout=1.0)
self._operation_thread = None
self._settling_time = 0.0
self._status = "Idle"
self._shake_status = "Idle"
self._shake_time = 0.0
self._time_remaining = timedelta()
return "Success"
def get_status_info(self) -> dict:
"""
获取当前设备状态信息
"""
"""获取当前设备状态信息"""
with self._thread_lock:
current_time = datetime.now()
if self._start_time:
self._time_spent = current_time - self._start_time
return {
"status": self._status,
"power_state": self._power_state,
"valve_state": self._valve_state,
"settling_time": self._settling_time,
"shake_time": self._shake_time,
"shake_status": self._shake_status,
"current_device": self._current_device,
"purpose": self._purpose,
"product_phase": self._product_phase,
"from_vessel": self._from_vessel,
"separation_vessel": self._separation_vessel,
"to_vessel": self._to_vessel,
"waste_phase_to_vessel": self._waste_phase_to_vessel,
"solvent": self._solvent,
"solvent_volume": self._solvent_volume,
"through": self._through,
"repeats": self._repeats,
"stir_time": self._stir_time,
"stir_speed": self._stir_speed,
"time_spent": self._time_spent.total_seconds(),
"time_remaining": self._time_remaining.total_seconds()
}
@@ -162,7 +378,6 @@ if __name__ == "__main__":
separator = MockSeparator()
print("启动简单版分离器测试...")
print(separator.power_control("On"))
print("初始状态:", separator.get_status_info())
# 触发 shake 操作,模拟 10 秒的搅拌

View File

@@ -3,25 +3,11 @@ import threading
class MockStirrer:
"""
模拟搅拌器设备类
这个类模拟了一个实验室搅拌器的行为,包括搅拌速度控制、
温度监测、加热控制等功能。参考了现有的 HeaterStirrer_DaLong 实现。
"""
def __init__(self, port: str = "MOCK"):
"""
初始化MockStirrer实例
Args:
port (str): 设备端口,默认为"MOCK"表示模拟设备
"""
self.port = port
# 设备基本状态属性
self._status: str = "Idle" # 设备状态Idle, Running, Error, Stopped
self._power_state: str = "Off" # 电源状态On, Off
# 搅拌相关属性
self._stir_speed: float = 0.0 # 当前搅拌速度 (rpm)
@@ -46,52 +32,18 @@ class MockStirrer:
@property
def status(self) -> str:
"""
设备状态 - 会被自动识别的设备属性
Returns:
str: 当前设备状态 (Idle, Running, Error, Stopped)
"""
return self._status
@property
def power_state(self) -> str:
"""
电源状态
Returns:
str: 电源状态 (On, Off)
"""
return self._power_state
@property
def stir_speed(self) -> float:
"""
当前搅拌速度
Returns:
float: 当前搅拌速度 (rpm)
"""
return self._stir_speed
@property
def target_stir_speed(self) -> float:
"""
目标搅拌速度
Returns:
float: 目标搅拌速度 (rpm)
"""
return self._target_stir_speed
@property
def stir_state(self) -> str:
"""
搅拌状态
Returns:
str: 搅拌状态 (Running, Stopped)
"""
return self._stir_state
@property
@@ -116,86 +68,26 @@ class MockStirrer:
@property
def heating_state(self) -> str:
"""
加热状态
Returns:
str: 加热状态 (On, Off)
"""
return self._heating_state
@property
def heating_power(self) -> float:
"""
加热功率
Returns:
float: 加热功率百分比 (0-100)
"""
return self._heating_power
@property
def max_stir_speed(self) -> float:
"""
最大搅拌速度
Returns:
float: 最大搅拌速度 (rpm)
"""
return self._max_stir_speed
@property
def max_temperature(self) -> float:
"""
最大温度
Returns:
float: 最大温度 (°C)
"""
return self._max_temperature
# ==================== 设备控制方法 ====================
# 这些方法需要在注册表中添加会作为ActionServer接受控制指令
def power_control(self, power_state: str = "On") -> str:
"""
电源控制方法
Args:
power_state (str): 电源状态,可选值:"On", "Off"
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if power_state not in ["On", "Off"]:
self._status = "Error: Invalid power state"
return "Error"
self._power_state = power_state
if power_state == "On":
self._status = "Power On"
self._start_operation()
else:
self._status = "Power Off"
self.stop_all_operations()
return "Success"
def set_stir_speed(self, speed: float) -> str:
"""
设置搅拌速度
Args:
speed (float): 目标搅拌速度 (rpm)
Returns:
str: 操作结果状态 ("Success", "Error")
"""
speed = float(speed) # 确保传入的速度是浮点数
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if speed < 0 or speed > self._max_stir_speed:
self._status = f"Error: Speed out of range (0-{self._max_stir_speed})"
@@ -213,19 +105,7 @@ class MockStirrer:
return "Success"
def set_temperature(self, temperature: float) -> str:
"""
设置目标温度
Args:
temperature (float): 目标温度 (°C)
Returns:
str: 操作结果状态 ("Success", "Error")
"""
temperature = float(temperature) # 确保传入的温度是浮点数
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if temperature < 0 or temperature > self._max_temperature:
self._status = f"Error: Temperature out of range (0-{self._max_temperature})"
@@ -237,15 +117,6 @@ class MockStirrer:
return "Success"
def start_stirring(self) -> str:
"""
启动搅拌
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if self._target_stir_speed <= 0:
self._status = "Error: No target speed set"
@@ -256,30 +127,12 @@ class MockStirrer:
return "Success"
def stop_stirring(self) -> str:
"""
停止搅拌
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._stir_state = "Stopped"
self._target_stir_speed = 0.0
self._status = "Stirring Stopped"
return "Success"
def heating_control(self, heating_state: str = "On") -> str:
"""
加热控制
Args:
heating_state (str): 加热状态,可选值:"On", "Off"
Returns:
str: 操作结果状态 ("Success", "Error")
"""
if self._power_state != "On":
self._status = "Error: Power Off"
return "Error"
if heating_state not in ["On", "Off"]:
self._status = "Error: Invalid heating state"
@@ -296,12 +149,6 @@ class MockStirrer:
return "Success"
def stop_all_operations(self) -> str:
"""
停止所有操作
Returns:
str: 操作结果状态 ("Success", "Error")
"""
self._stir_state = "Stopped"
self._heating_state = "Off"
self._stop_operation()
@@ -325,13 +172,8 @@ class MockStirrer:
# ==================== 内部控制方法 ====================
def _start_operation(self):
"""
启动操作线程
这个方法启动一个后台线程来模拟搅拌器的实际运行过程。
"""
with self._thread_lock:
if not self._running and self._power_state == "On":
if not self._running:
self._running = True
self._operation_thread = threading.Thread(target=self._operation_loop)
self._operation_thread.daemon = True
@@ -349,15 +191,7 @@ class MockStirrer:
self._operation_thread.join(timeout=2.0)
def _operation_loop(self):
"""
操作主循环
这个方法在后台线程中运行,模拟真实搅拌器的工作过程:
1. 搅拌速度控制
2. 温度控制和加热
3. 状态更新
"""
while self._running and self._power_state == "On":
while self._running:
try:
# 处理搅拌速度控制
if self._stir_state == "Running":
@@ -431,19 +265,11 @@ class MockStirrer:
break
# 循环结束时的清理工作
if self._power_state == "On":
self._status = "Idle"
def get_status_info(self) -> dict:
"""
获取完整的设备状态信息
Returns:
dict: 包含所有设备状态的字典
"""
return {
"status": self._status,
"power_state": self._power_state,
"stir_speed": self._stir_speed,
"target_stir_speed": self._target_stir_speed,
"stir_state": self._stir_state,
@@ -462,7 +288,6 @@ if __name__ == "__main__":
# 测试基本功能
print("启动搅拌器测试...")
stirrer.power_control("On")
print(f"初始状态: {stirrer.get_status_info()}")
# 设置搅拌速度和温度

View File

@@ -0,0 +1,229 @@
import time
import threading
from datetime import datetime, timedelta
class MockStirrer_new:
def __init__(self, port: str = "MOCK"):
self.port = port
# 基本状态属性
self._status: str = "Idle"
self._vessel: str = ""
self._purpose: str = ""
# 搅拌相关属性
self._stir_speed: float = 0.0
self._target_stir_speed: float = 0.0
self._max_stir_speed: float = 2000.0
self._stir_state: str = "Stopped"
# 计时相关
self._stir_time: float = 0.0
self._settling_time: float = 0.0
self._start_time = datetime.now()
self._time_remaining = timedelta()
# 运行控制
self._operation_thread = None
self._running = False
self._thread_lock = threading.Lock()
# 创建操作线程
self._operation_thread = threading.Thread(target=self._operation_loop)
self._operation_thread.daemon = True
self._operation_thread.start()
# ==================== 状态属性 ====================
@property
def status(self) -> str:
return self._status
@property
def stir_speed(self) -> float:
return self._stir_speed
@property
def target_stir_speed(self) -> float:
return self._target_stir_speed
@property
def stir_state(self) -> str:
return self._stir_state
@property
def vessel(self) -> str:
return self._vessel
@property
def purpose(self) -> str:
return self._purpose
@property
def stir_time(self) -> float:
return self._stir_time
@property
def settling_time(self) -> float:
return self._settling_time
@property
def max_stir_speed(self) -> float:
return self._max_stir_speed
@property
def progress(self) -> float:
"""返回当前操作的进度0-100"""
if not self._running:
return 0.0
elapsed = (datetime.now() - self._start_time).total_seconds()
total_time = self._stir_time + self._settling_time
if total_time <= 0:
return 100.0
return min(100.0, (elapsed / total_time) * 100)
# ==================== Action Server 方法 ====================
def start_stir(self, vessel: str, stir_speed: float = 0.0, purpose: str = "") -> dict:
"""
StartStir.action 对应的方法
"""
with self._thread_lock:
if self._running:
return {
"success": False,
"message": "Operation already in progress"
}
try:
# 重置所有参数
self._vessel = vessel
self._purpose = purpose
self._stir_time = 0.0 # 连续搅拌模式下不设置搅拌时间
self._settling_time = 0.0
self._start_time = datetime.now() # 重置开始时间
if stir_speed > 0:
self._target_stir_speed = min(stir_speed, self._max_stir_speed)
self._stir_state = "Running"
self._status = "Stirring Started"
self._running = True
return {
"success": True,
"message": "Stirring started successfully"
}
except Exception as e:
return {
"success": False,
"message": f"Error: {str(e)}"
}
def stir(self, stir_time: float, stir_speed: float, settling_time: float) -> dict:
"""
Stir.action 对应的方法
"""
with self._thread_lock:
try:
# 如果已经在运行,先停止当前操作
if self._running:
self._running = False
self._stir_state = "Stopped"
self._target_stir_speed = 0.0
time.sleep(0.1) # 给一个短暂的停止时间
# 重置所有参数
self._stir_time = float(stir_time)
self._settling_time = float(settling_time)
self._target_stir_speed = min(float(stir_speed), self._max_stir_speed)
self._start_time = datetime.now() # 重置开始时间
self._stir_state = "Running"
self._status = "Stirring"
self._running = True
return {"success": True}
except ValueError:
self._status = "Error: Invalid parameters"
return {"success": False}
def stop_stir(self, vessel: str) -> dict:
"""
StopStir.action 对应的方法
"""
with self._thread_lock:
if vessel != self._vessel:
return {
"success": False,
"message": "Vessel mismatch"
}
self._running = False
self._stir_state = "Stopped"
self._target_stir_speed = 0.0
self._status = "Stirring Stopped"
return {
"success": True,
"message": "Stirring stopped successfully"
}
# ==================== 内部控制方法 ====================
def _operation_loop(self):
"""操作主循环"""
while True:
try:
current_time = datetime.now()
with self._thread_lock: # 添加锁保护
if self._stir_state == "Running":
# 实际搅拌逻辑
speed_diff = self._target_stir_speed - self._stir_speed
if abs(speed_diff) > 0.1:
adjustment = speed_diff * 0.1
self._stir_speed += adjustment
else:
self._stir_speed = self._target_stir_speed
# 更新进度
if self._running:
if self._stir_time > 0: # 定时搅拌模式
elapsed = (current_time - self._start_time).total_seconds()
if elapsed >= self._stir_time + self._settling_time:
self._running = False
self._stir_state = "Stopped"
self._target_stir_speed = 0.0
self._stir_speed = 0.0
self._status = "Stirring Complete"
elif elapsed >= self._stir_time:
self._status = "Settling"
else: # 连续搅拌模式
self._status = "Stirring"
else:
# 停止状态下慢慢降低速度
if self._stir_speed > 0:
self._stir_speed = max(0, self._stir_speed - 20.0)
time.sleep(0.1)
except Exception as e:
print(f"Error in operation loop: {str(e)}") # 添加错误输出
self._status = f"Error: {str(e)}"
time.sleep(1.0) # 错误发生时等待较长时间
def get_status_info(self) -> dict:
"""获取设备状态信息"""
return {
"status": self._status,
"vessel": self._vessel,
"purpose": self._purpose,
"stir_speed": self._stir_speed,
"target_stir_speed": self._target_stir_speed,
"stir_state": self._stir_state,
"stir_time": self._stir_time, # 添加
"settling_time": self._settling_time, # 添加
"progress": self.progress,
"max_stir_speed": self._max_stir_speed
}

View File

@@ -7,30 +7,35 @@ mock_chiller:
current_temperature: Float64
target_temperature: Float64
status: String
power_on: Bool
is_cooling: Bool
is_heating: Bool
vessel: String # 新增
purpose: String # 新增
action_value_mappings:
set_temperature:
type: FloatSingleInput
goal:
float_in: temperature
feedback: {}
result:
success: success
power_on_off:
type: StrSingleInput
goal:
string: power_state
feedback: {}
result:
success: success
emergency_stop:
type: EmptyIn
goal: {}
feedback: {}
result:
success: success
heat_chill_start:
type: HeatChillStart
goal:
vessel: vessel
temp: temp
purpose: purpose
feedback: {}
result:
success: success
status: status
heat_chill_stop:
type: HeatChillStop
goal:
vessel: vessel
feedback: {}
result:
success: success
status: status
schema:
type: object
properties:
@@ -43,20 +48,24 @@ mock_chiller:
status:
type: string
description: Current status of the device
power_on:
type: boolean
description: Power state of the device
is_cooling:
type: boolean
description: Whether the device is actively cooling
is_heating:
type: boolean
description: Whether the device is actively heating
vessel: # 新增
type: string
description: Current vessel being processed
purpose: # 新增
type: string
description: Purpose of the current operation
required:
- current_temperature
- target_temperature
- status
- power_on
- vessel
- purpose
additionalProperties: false
mock_filter:
description: Mock Filter Device
@@ -66,32 +75,43 @@ mock_filter:
status_types:
status: String
is_filtering: Bool
filter_efficiency: Float64
flow_rate: Float64
pressure_drop: Float64
filter_life: Float64
power_on: Bool
vessel: String
filtrate_vessel: String
filtered_volume: Float64
progress: Float64
stir: Bool
stir_speed: Float64
temperature: Float64
continue_heatchill: Bool
target_volume: Float64
action_value_mappings:
start_filtering:
type: FloatSingleInput
filter:
type: Filter
goal:
float_in: flow_rate
feedback: {}
vessel: vessel
filtrate_vessel: filtrate_vessel
stir: stir
stir_speed: stir_speed
temp: temp
continue_heatchill: continue_heatchill
volume: volume
feedback:
progress: progress
current_temp: current_temp
filtered_volume: filtered_volume
current_status: current_status
result:
success: success
message: message
stop_filtering:
type: EmptyIn
goal: {}
feedback: {}
result:
success: success
power_on_off:
type: StrSingleInput
goal:
string: power_state
feedback: {}
result:
success: success
replace_filter:
type: EmptyIn
goal: {}
@@ -107,9 +127,6 @@ mock_filter:
is_filtering:
type: boolean
description: Whether the filter is actively filtering
filter_efficiency:
type: number
description: Filter efficiency percentage
flow_rate:
type: number
description: Current flow rate in L/min
@@ -125,8 +142,12 @@ mock_filter:
required:
- status
- is_filtering
- filter_efficiency
- power_on
- flow_rate
- filter_life
- vessel
- filtrate_vessel
- filtered_volume
- progress
additionalProperties: false
mock_heater:
description: Mock Heater Device
@@ -137,29 +158,48 @@ mock_heater:
current_temperature: Float64
target_temperature: Float64
status: String
power_on: Bool
is_heating: Bool
heating_power: Float64
max_temperature: Float64
vessel: String
purpose: String
stir: Bool
stir_speed: Float64
action_value_mappings:
set_temperature:
type: FloatSingleInput
heat_chill_start:
type: HeatChillStart
goal:
float_in: temperature
feedback: {}
vessel: vessel
temp: temp
purpose: purpose
feedback:
status: status
result:
success: success
set_heating_power:
type: FloatSingleInput
heat_chill_stop:
type: HeatChillStop
goal:
float_in: power
feedback: {}
vessel: vessel
feedback:
status: status
result:
success: success
power_on_off:
type: StrSingleInput
heat_chill:
type: HeatChill
goal:
string: power_state
vessel: vessel
temp: temp
time: time
stir: stir
stir_speed: stir_speed
purpose: purpose
feedback:
status: status
result:
success: success
emergency_stop:
type: EmptyIn
goal: {}
feedback: {}
result:
success: success
@@ -168,16 +208,13 @@ mock_heater:
properties:
current_temperature:
type: number
description: Current temperature of the heater
description: Current temperature of the heater in °C
target_temperature:
type: number
description: Target temperature setting
description: Target temperature setting in °C
status:
type: string
description: Current status of the device
power_on:
type: boolean
description: Power state of the device
is_heating:
type: boolean
description: Whether the device is actively heating
@@ -186,12 +223,25 @@ mock_heater:
description: Current heating power percentage
max_temperature:
type: number
description: Maximum heating temperature limit
description: Maximum temperature limit
vessel:
type: string
description: Current vessel being heated
purpose:
type: string
description: Purpose of the heating operation
stir:
type: boolean
description: Whether stirring is enabled
stir_speed:
type: number
description: Current stirring speed
required:
- current_temperature
- target_temperature
- status
- power_on
- vessel
- purpose
additionalProperties: false
mock_pump:
description: Mock Pump Device
@@ -200,42 +250,47 @@ mock_pump:
type: python
status_types:
status: String
power_state: String
pump_state: String
flow_rate: Float64
target_flow_rate: Float64
pressure: Float64
total_volume: Float64
direction: String
max_flow_rate: Float64
max_pressure: Float64
from_vessel: String
to_vessel: String
transfer_volume: Float64
amount: String
transfer_time: Float64
is_viscous: Bool
rinsing_solvent: String
rinsing_volume: Float64
rinsing_repeats: Int32
is_solid: Bool
time_spent: Float64
time_remaining: Float64
current_device: String
action_value_mappings:
power_control:
type: StrSingleInput
goal:
string: power_state
feedback: {}
result:
success: success
set_flow_rate:
type: FloatSingleInput
goal:
float_in: flow_rate
feedback: {}
result:
success: success
start_pump:
type: EmptyIn
goal: {}
feedback: {}
result:
success: success
stop_pump:
type: EmptyIn
goal: {}
feedback: {}
result:
success: success
pump_transfer:
type: PumpTransfer
goal:
from_vessel: from_vessel
to_vessel: to_vessel
volume: volume
amount: amount
time: time
viscous: viscous
rinsing_solvent: rinsing_solvent
rinsing_volume: rinsing_volume
rinsing_repeats: rinsing_repeats
solid: solid
feedback:
status: status
current_device: current_device
time_spent: time_spent
time_remaining: time_remaining
result:
success: success
pause_pump:
type: EmptyIn
goal: {}
@@ -248,13 +303,6 @@ mock_pump:
feedback: {}
result:
success: success
set_direction:
type: StrSingleInput
goal:
string: direction
feedback: {}
result:
success: success
reset_volume_counter:
type: EmptyIn
goal: {}
@@ -267,9 +315,6 @@ mock_pump:
status:
type: string
description: Current status of the pump
power_state:
type: string
description: Power state (On/Off)
pump_state:
type: string
description: Pump operation state (Running/Stopped/Paused)
@@ -285,20 +330,51 @@ mock_pump:
total_volume:
type: number
description: Total accumulated volume in mL
direction:
type: string
description: Pump direction (Forward/Reverse)
max_flow_rate:
type: number
description: Maximum flow rate in mL/min
max_pressure:
type: number
description: Maximum pressure in bar
from_vessel:
type: string
description: Source vessel for transfer
to_vessel:
type: string
description: Target vessel for transfer
transfer_volume:
type: number
description: Volume to transfer in mL
amount:
type: string
description: Amount description
transfer_time:
type: number
description: Transfer time in seconds
is_viscous:
type: boolean
description: Whether the liquid is viscous
rinsing_solvent:
type: string
description: Solvent used for rinsing
rinsing_volume:
type: number
description: Volume used for rinsing
rinsing_repeats:
type: integer
description: Number of rinsing cycles
is_solid:
type: boolean
description: Whether transferring solid material
current_device:
type: string
description: Current device identifier
required:
- status
- power_state
- pump_state
- flow_rate
- from_vessel
- to_vessel
additionalProperties: false
mock_rotavap:
description: Mock Rotavap Device
@@ -307,7 +383,6 @@ mock_rotavap:
type: python
status_types:
status: String
power_state: String
rotate_state: String
rotate_time: Float64
rotate_speed: Float64
@@ -325,13 +400,6 @@ mock_rotavap:
feedback: {}
result:
success: success
power_control:
type: StrSingleInput
goal:
string: power_state
feedback: {}
result:
success: success
set_rotate_time:
type: FloatSingleInput
goal:
@@ -378,9 +446,6 @@ mock_rotavap:
status:
type: string
description: Current status of the rotavap
power_state:
type: string
description: Power state (On/Off)
rotate_state:
type: string
description: Rotation state (Running/Stopped)
@@ -421,12 +486,49 @@ mock_separator:
type: python
status_types:
status: String
power_state: String
settling_time: Float64
valve_state: String
shake_time: Float64
shake_status: String
current_device: String
purpose: String
product_phase: String
from_vessel: String
separation_vessel: String
to_vessel: String
waste_phase_to_vessel: String
solvent: String
solvent_volume: Float64
through: String
repeats: Int32
stir_time: Float64
stir_speed: Float64
time_spent: Float64
time_remaining: Float64
action_value_mappings:
separate:
type: Separate
goal:
purpose: purpose
product_phase: product_phase
from_vessel: from_vessel
separation_vessel: separation_vessel
to_vessel: to_vessel
waste_phase_to_vessel: waste_phase_to_vessel
solvent: solvent
solvent_volume: solvent_volume
through: through
repeats: repeats
stir_time: stir_time
stir_speed: stir_speed
settling_time: settling_time
feedback:
status: status
current_device: current_device
time_spent: time_spent
time_remaining: time_remaining
result:
success: success
shake:
type: FloatSingleInput
goal:
@@ -435,28 +537,25 @@ mock_separator:
status: status
result:
success: success
power_control:
type: StrSingleInput
goal:
string: power_state
feedback: {}
result:
success: success
stop_operations:
type: EmptyIn
goal: {}
feedback: {}
result:
success: success
set_valve:
type: StrSingleInput
goal:
string: command
feedback: {}
result:
success: success
schema:
type: object
properties:
status:
type: string
description: Current status of the separator
power_state:
type: string
description: Power state (On/Off)
settling_time:
type: number
description: Settling time in seconds
@@ -468,10 +567,27 @@ mock_separator:
description: Remaining shake time in seconds
shake_status:
type: string
description: Current shake state (e.g. Shaking, Settling, Idle)
description: Current shake state
purpose:
type: string
description: Separation purpose (wash/extract)
product_phase:
type: string
description: Product phase (top/bottom)
from_vessel:
type: string
description: Source vessel
separation_vessel:
type: string
description: Vessel for separation
to_vessel:
type: string
description: Target vessel
required:
- status
- power_state
- valve_state
- shake_status
- current_device
additionalProperties: false
mock_solenoid_valve:
description: Mock Solenoid Valve Device
@@ -521,7 +637,6 @@ mock_stirrer:
type: python
status_types:
status: String
power_state: String
stir_speed: Float64
target_stir_speed: Float64
stir_state: String
@@ -532,13 +647,6 @@ mock_stirrer:
max_stir_speed: Float64
max_temperature: Float64
action_value_mappings:
power_control:
type: StrSingleInput
goal:
string: power_state
feedback: {}
result:
success: success
set_stir_speed:
type: FloatSingleInput
goal:
@@ -578,9 +686,6 @@ mock_stirrer:
status:
type: string
description: Current status of the stirrer
power_state:
type: string
description: Power state (On/Off)
stir_speed:
type: number
description: Current stirring speed in rpm
@@ -614,6 +719,85 @@ mock_stirrer:
- temperature
- power_state
additionalProperties: false
mock_stirrer_new:
description: Mock Stirrer Device (Copy Version)
class:
module: unilabos.devices.mock.mock_stirrer_new:MockStirrer_new
type: python
status_types:
status: String
vessel: String
purpose: String
stir_speed: Float64
target_stir_speed: Float64
stir_state: String
stir_time: Float64
settling_time: Float64
progress: Float64
max_stir_speed: Float64
action_value_mappings:
start_stir:
type: StartStir
goal:
vessel: vessel
stir_speed: stir_speed
purpose: purpose
feedback:
progress: progress
current_speed: stir_speed
current_status: status
result:
success: success
message: message
stir:
type: Stir
goal:
stir_time: stir_time
stir_speed: stir_speed
settling_time: settling_time
feedback:
status: status
result:
success: success
stop_stir:
type: StopStir
goal:
vessel: vessel
feedback:
progress: progress
current_status: status
result:
success: success
message: message
schema:
type: object
properties:
status:
type: string
vessel:
type: string
purpose:
type: string
stir_speed:
type: number
target_stir_speed:
type: number
stir_state:
type: string
stir_time:
type: number
settling_time:
type: number
progress:
type: number
max_stir_speed:
type: number
required:
- status
- stir_speed
- stir_state
- vessel
additionalProperties: false
mock_vacuum:
description: Mock Vacuum Pump Device
class:

View File

@@ -28,7 +28,17 @@ set(action_files
"action/HeatChill.action"
"action/HeatChillStart.action"
"action/HeatChillStop.action"
"action/Filter.action"
"action/Add.action"
"action/Centrifuge.action"
"action/Crystallize.action"
"action/Dry.action"
"action/Purge.action"
"action/StartPurge.action"
"action/StartStir.action"
"action/StopPurge.action"
"action/StopStir.action"
"action/Transfer.action"
"action/LiquidHandlerAspirate.action"
"action/LiquidHandlerDiscardTips.action"
"action/LiquidHandlerDispense.action"

View File

@@ -0,0 +1,19 @@
# Goal - 添加试剂的目标参数
string vessel # 目标容器
string reagent # 试剂名称
float64 volume # 体积 (可选)
float64 mass # 质量 (可选)
string amount # 数量描述 (可选)
float64 time # 添加时间 (可选)
bool stir # 是否搅拌
float64 stir_speed # 搅拌速度 (可选)
bool viscous # 是否为粘性液体
string purpose # 添加目的 (可选)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
string current_status # 当前状态描述

View File

@@ -0,0 +1,15 @@
# Goal - 离心操作的目标参数
string vessel # 离心容器
float64 speed # 离心速度 (rpm)
float64 time # 离心时间 (秒)
float64 temp # 温度 (可选,摄氏度)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_speed # 当前转速
float64 current_temp # 当前温度
string current_status # 当前状态描述

View File

@@ -0,0 +1,13 @@
# Goal - 结晶操作的目标参数
string vessel # 结晶容器
float64 ramp_time # 升温/降温时间 (可选,秒)
float64 ramp_temp # 目标温度 (可选,摄氏度)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_temp # 当前温度
string current_status # 当前状态描述

View File

@@ -0,0 +1,16 @@
# Goal - 干燥操作的目标参数
string vessel # 干燥容器
float64 time # 干燥时间 (可选,秒)
float64 pressure # 压力 (可选Pa)
float64 temp # 温度 (可选,摄氏度)
bool continue_heatchill # 是否继续加热冷却
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_temp # 当前温度
float64 current_pressure # 当前压力
string current_status # 当前状态描述

View File

@@ -0,0 +1,18 @@
# Goal - 过滤操作的目标参数
string vessel # 过滤容器
string filtrate_vessel # 滤液容器 (可选)
bool stir # 是否搅拌
float64 stir_speed # 搅拌速度 (可选)
float64 temp # 温度 (可选,摄氏度)
bool continue_heatchill # 是否继续加热冷却
float64 volume # 过滤体积 (可选)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_temp # 当前温度
float64 filtered_volume # 已过滤体积
string current_status # 当前状态描述

View File

@@ -0,0 +1,16 @@
# Goal - 清洗/吹扫操作的目标参数
string vessel # 清洗容器
string gas # 清洗气体 (可选)
float64 time # 清洗时间 (可选,秒)
float64 pressure # 压力 (可选Pa)
float64 flow_rate # 流速 (可选mL/min)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_pressure # 当前压力
float64 current_flow_rate # 当前流速
string current_status # 当前状态描述

View File

@@ -0,0 +1,15 @@
# Goal - 启动清洗/吹扫操作的目标参数
string vessel # 清洗容器
string gas # 清洗气体 (可选)
float64 pressure # 压力 (可选Pa)
float64 flow_rate # 流速 (可选mL/min)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_pressure # 当前压力
float64 current_flow_rate # 当前流速
string current_status # 当前状态描述

View File

@@ -0,0 +1,13 @@
# Goal - 启动搅拌操作的目标参数
string vessel # 搅拌容器
float64 stir_speed # 搅拌速度 (可选rpm)
string purpose # 搅拌目的 (可选)
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 current_speed # 当前搅拌速度
string current_status # 当前状态描述

View File

@@ -0,0 +1,10 @@
# Goal - 停止清洗/吹扫操作的目标参数
string vessel # 清洗容器
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
string current_status # 当前状态描述

View File

@@ -0,0 +1,10 @@
# Goal - 停止搅拌操作的目标参数
string vessel # 搅拌容器
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
string current_status # 当前状态描述

View File

@@ -0,0 +1,19 @@
string from_vessel # 源容器
string to_vessel # 目标容器
float64 volume # 转移体积 (可选)
string amount # 数量描述 (可选)
float64 time # 转移时间 (可选,秒)
bool viscous # 是否为粘性液体
string rinsing_solvent # 冲洗溶剂 (可选)
float64 rinsing_volume # 冲洗体积 (可选)
int32 rinsing_repeats # 冲洗重复次数
bool solid # 是否涉及固体
---
# Result - 操作结果
bool success # 操作是否成功
string message # 结果消息
---
# Feedback - 实时反馈
float64 progress # 进度百分比 (0-100)
float64 transferred_volume # 已转移体积
string current_status # 当前状态描述