Add Device MockFilter

This commit is contained in:
KCFeng425
2025-06-05 13:34:54 +08:00
parent ff76bb1a76
commit 8c81dab7e1
4 changed files with 267 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
{
"nodes": [
{
"id": "MockFilter1",
"name": "模拟过滤器",
"children": [],
"parent": null,
"type": "device",
"class": "MockFilter",
"position": {
"x": 620.6111111111111,
"y": 171,
"z": 0
},
"config": {
"port": "MOCK"
},
"data": {
"status": "Idle",
"is_filtering": false,
"filter_efficiency": 95.0,
"flow_rate": 0.0,
"pressure_drop": 0.0,
"filter_life": 100.0,
"power_on": false
}
}
],
"links": []
}

View File

@@ -0,0 +1,167 @@
import time
import threading
class MockFilter:
def __init__(self, port: str = "MOCK"):
self.port = port
self._status: str = "Idle"
self._is_filtering: bool = False
self._filter_efficiency: float = 95.0 # 过滤效率百分比
self._flow_rate: float = 0.0 # 流速 L/min
self._pressure_drop: float = 0.0 # 压降 Pa
self._filter_life: float = 100.0 # 滤芯寿命百分比
self._power_on: bool = False
# 模拟过滤过程的线程
self._filter_thread = None
self._running = False
@property
def status(self) -> str:
"""设备状态 - 会被自动识别的设备属性"""
return self._status
@property
def is_filtering(self) -> bool:
"""是否正在过滤"""
return self._is_filtering
@property
def filter_efficiency(self) -> float:
"""过滤效率"""
return self._filter_efficiency
@property
def flow_rate(self) -> float:
"""流速"""
return self._flow_rate
@property
def pressure_drop(self) -> float:
"""压降"""
return self._pressure_drop
@property
def filter_life(self) -> float:
"""滤芯寿命"""
return self._filter_life
@property
def power_on(self) -> bool:
"""电源状态"""
return self._power_on
def start_filtering(self, flow_rate: float = 1.0):
"""开始过滤 - 需要在注册表添加的设备动作"""
if not self._power_on:
self._status = "Error: Power Off"
return False
self._flow_rate = flow_rate
self._status = "Starting Filter"
self._start_filter_process()
return True
def stop_filtering(self):
"""停止过滤"""
self._status = "Stopping Filter"
self._stop_filter_process()
self._flow_rate = 0.0
self._is_filtering = False
self._status = "Idle"
return True
def power_on_off(self, power_state: str):
"""开关机控制"""
if power_state == "on":
self._power_on = True
self._status = "Power On"
else:
self._power_on = False
self._status = "Power Off"
self._stop_filter_process()
self._is_filtering = False
self._flow_rate = 0.0
def replace_filter(self):
"""更换滤芯"""
self._filter_life = 100.0
self._filter_efficiency = 95.0
self._status = "Filter Replaced"
return True
def _start_filter_process(self):
"""启动过滤过程线程"""
if not self._running and self._power_on:
self._running = True
self._is_filtering = True
self._filter_thread = threading.Thread(target=self._filter_loop)
self._filter_thread.daemon = True
self._filter_thread.start()
def _stop_filter_process(self):
"""停止过滤过程"""
self._running = False
if self._filter_thread:
self._filter_thread.join(timeout=1.0)
def _filter_loop(self):
"""过滤过程循环 - 模拟真实过滤器的工作过程"""
while self._running and self._power_on and self._is_filtering:
self._status = "Filtering"
# 模拟滤芯磨损
if self._filter_life > 0:
self._filter_life -= 0.1 # 每秒减少0.1%寿命
# 根据滤芯寿命调整效率和压降
life_factor = self._filter_life / 100.0
self._filter_efficiency = 95.0 * life_factor + 50.0 * (1 - life_factor)
self._pressure_drop = 100.0 + (200.0 * (1 - life_factor)) # 压降随磨损增加
# 检查滤芯是否需要更换
if self._filter_life <= 10.0:
self._status = "Filter Needs Replacement"
time.sleep(1.0) # 每秒更新一次
def emergency_stop(self):
"""紧急停止"""
self._status = "Emergency Stop"
self._stop_filter_process()
self._is_filtering = False
self._flow_rate = 0.0
def get_status_info(self) -> dict:
"""获取完整状态信息"""
return {
"status": self._status,
"is_filtering": self._is_filtering,
"filter_efficiency": self._filter_efficiency,
"flow_rate": self._flow_rate,
"pressure_drop": self._pressure_drop,
"filter_life": self._filter_life,
"power_on": self._power_on
}
# 用于测试的主函数
if __name__ == "__main__":
filter_device = MockFilter()
# 测试基本功能
print("启动过滤器测试...")
filter_device.power_on_off("on")
print(f"初始状态: {filter_device.get_status_info()}")
# 开始过滤
filter_device.start_filtering(2.0)
# 模拟运行10秒
for i in range(10):
time.sleep(1)
print(f"{i+1}秒: 效率={filter_device.filter_efficiency:.1f}%, 寿命={filter_device.filter_life:.1f}%, 状态={filter_device.status}")
filter_device.emergency_stop()
print("测试完成")

View File

@@ -0,0 +1,70 @@
MockFilter:
description: Mock Filter Device
class:
module: unilabos.devices.Mock.MockFilter.MockFilter:MockFilter
type: python
status_types:
status: String
is_filtering: Bool
filter_efficiency: Float64
flow_rate: Float64
pressure_drop: Float64
filter_life: Float64
power_on: Bool
action_value_mappings:
start_filtering:
type: SendCmd
goal:
command: flow_rate
feedback: {}
result:
success: success
stop_filtering:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
power_on_off:
type: SendCmd
goal:
command: power_state
feedback: {}
result:
success: success
replace_filter:
type: SendCmd
goal: {}
feedback: {}
result:
success: success
schema:
type: object
properties:
status:
type: string
description: Current status of the filter
is_filtering:
type: boolean
description: Whether the filter is actively filtering
filter_efficiency:
type: number
description: Filter efficiency percentage
flow_rate:
type: number
description: Current flow rate in L/min
pressure_drop:
type: number
description: Pressure drop across the filter in Pa
filter_life:
type: number
description: Remaining filter life percentage
power_on:
type: boolean
description: Power state of the device
required:
- status
- is_filtering
- filter_efficiency
- power_on
additionalProperties: false