diff --git a/unilabos/devices/virtual/virtual_multiway_valve.py b/unilabos/devices/virtual/virtual_multiway_valve.py new file mode 100644 index 0000000..e69fd7c --- /dev/null +++ b/unilabos/devices/virtual/virtual_multiway_valve.py @@ -0,0 +1,111 @@ +import time +from typing import Union + + +class VirtualMultiwayValve: + """ + 虚拟八通阀门 - 可将一个输入切换到8个输出中的任意一个 + """ + def __init__(self, port: str = "VIRTUAL", positions: int = 8): + self.port = port + self.max_positions = positions + + # 状态属性 + self._status = "Idle" + self._valve_state = "Ready" + self._current_position = 1 + self._target_position = 1 + + @property + def status(self) -> str: + return self._status + + @property + def valve_state(self) -> str: + return self._valve_state + + @property + def current_position(self) -> int: + return self._current_position + + @property + def target_position(self) -> int: + return self._target_position + + def get_current_position(self) -> int: + """获取当前阀门位置""" + return self._current_position + + def set_position(self, position: Union[int, str]): + """ + 设置阀门位置 + + Args: + position: 目标位置 (1-8) + """ + try: + pos = int(position) + if pos < 1 or pos > self.max_positions: + raise ValueError(f"Position must be between 1 and {self.max_positions}") + + self._status = "Busy" + self._valve_state = "Moving" + self._target_position = pos + + # 模拟阀门切换时间 + switch_time = abs(self._current_position - pos) * 0.5 # 每个位置0.5秒 + time.sleep(switch_time) + + self._current_position = pos + self._status = "Idle" + self._valve_state = "Ready" + + return f"Position set to {pos}" + + except ValueError as e: + self._status = "Error" + self._valve_state = "Error" + return f"Error: {str(e)}" + + def open(self): + """打开阀门 - 对于多通阀门,相当于设置到位置1""" + return self.set_position(1) + + def close(self): + """关闭阀门 - 对于多通阀门,相当于设置到关闭位置""" + self._status = "Busy" + self._valve_state = "Closing" + time.sleep(0.5) + + self._current_position = 0 # 0表示关闭状态 + self._status = "Idle" + self._valve_state = "Closed" + + return "Valve closed" + + def get_valve_position(self) -> int: + """获取阀门位置 - 兼容性方法""" + return self._current_position + + def is_at_position(self, position: int) -> bool: + """检查是否在指定位置""" + return self._current_position == position + + def get_available_positions(self) -> list: + """获取可用位置列表""" + return list(range(1, self.max_positions + 1)) + + def reset(self): + """重置阀门到初始位置""" + return self.set_position(1) + + def get_info(self) -> dict: + """获取阀门信息""" + return { + "port": self.port, + "max_positions": self.max_positions, + "current_position": self._current_position, + "target_position": self._target_position, + "status": self._status, + "valve_state": self._valve_state + } \ No newline at end of file diff --git a/unilabos/devices/virtual/virtual_solenoid_valve.py b/unilabos/devices/virtual/virtual_solenoid_valve.py new file mode 100644 index 0000000..92e8baf --- /dev/null +++ b/unilabos/devices/virtual/virtual_solenoid_valve.py @@ -0,0 +1,151 @@ +import time +from typing import Union + + +class VirtualSolenoidValve: + """ + 虚拟电磁阀门 - 简单的开关型阀门,只有开启和关闭两个状态 + """ + def __init__(self, port: str = "VIRTUAL", voltage: float = 12.0, response_time: float = 0.1): + self.port = port + self.voltage = voltage + self.response_time = response_time + + # 状态属性 + self._status = "Idle" + self._valve_state = "Closed" # "Open" or "Closed" + self._is_open = False + + @property + def status(self) -> str: + return self._status + + @property + def valve_state(self) -> str: + return self._valve_state + + @property + def is_open(self) -> bool: + return self._is_open + + def get_valve_position(self) -> str: + """获取阀门位置状态""" + return "OPEN" if self._is_open else "CLOSED" + + def set_valve_position(self, position: Union[str, bool]): + """ + 设置阀门位置 + + Args: + position: "OPEN"/"CLOSED" 或 True/False + """ + self._status = "Busy" + + # 模拟阀门响应时间 + time.sleep(self.response_time) + + if isinstance(position, str): + target_open = position.upper() == "OPEN" + elif isinstance(position, bool): + target_open = position + else: + self._status = "Error" + return "Error: Invalid position" + + self._is_open = target_open + self._valve_state = "Open" if target_open else "Closed" + self._status = "Idle" + + return f"Valve {'opened' if target_open else 'closed'}" + + def open(self): + """打开电磁阀""" + self._status = "Busy" + time.sleep(self.response_time) + + self._is_open = True + self._valve_state = "Open" + self._status = "Idle" + + return "Valve opened" + + def close(self): + """关闭电磁阀""" + self._status = "Busy" + time.sleep(self.response_time) + + self._is_open = False + self._valve_state = "Closed" + self._status = "Idle" + + return "Valve closed" + + def set_state(self, state: Union[bool, str]): + """ + 设置阀门状态 + + Args: + state: True/False 或 "open"/"close" + """ + if isinstance(state, bool): + return self.open() if state else self.close() + elif isinstance(state, str): + if state.lower() in ["open", "on", "true", "1"]: + return self.open() + elif state.lower() in ["close", "closed", "off", "false", "0"]: + return self.close() + else: + self._status = "Error" + return "Error: Invalid state" + else: + self._status = "Error" + return "Error: Invalid state type" + + def toggle(self): + """切换阀门状态""" + if self._is_open: + return self.close() + else: + return self.open() + + def is_closed(self) -> bool: + """检查阀门是否关闭""" + return not self._is_open + + def get_state(self) -> dict: + """获取阀门完整状态""" + return { + "port": self.port, + "voltage": self.voltage, + "response_time": self.response_time, + "is_open": self._is_open, + "valve_state": self._valve_state, + "status": self._status, + "position": self.get_valve_position() + } + + def reset(self): + """重置阀门到关闭状态""" + return self.close() + + def test_cycle(self, cycles: int = 3, delay: float = 1.0): + """ + 测试阀门开关循环 + + Args: + cycles: 循环次数 + delay: 每次开关间隔时间(秒) + """ + results = [] + for i in range(cycles): + # 打开 + result_open = self.open() + results.append(f"Cycle {i+1} - Open: {result_open}") + time.sleep(delay) + + # 关闭 + result_close = self.close() + results.append(f"Cycle {i+1} - Close: {result_close}") + time.sleep(delay) + + return results \ No newline at end of file diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index dd8de8b..8122a1f 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -1,3 +1,49 @@ +# 虚拟设备清单及连接特性 + +# 1. virtual_pump - 虚拟泵 +# 描述:具有多通道阀门特性的泵,根据valve_position可连接多个容器 +# 连接特性:1个输入口 + 1个输出口(当前配置,实际应该有多个输出口) +# 数据类型:fluid(流体连接) + +# 2. virtual_stirrer - 虚拟搅拌器 +# 描述:机械连接设备,提供搅拌功能 +# 连接特性:1个双向连接点(bidirectional) +# 数据类型:mechanical(机械连接) + +# 3a. virtual_valve - 虚拟八通阀门 +# 描述:8通阀门(实际配置为7通),可切换流向 +# 连接特性:1个口连接注射泵 + 7个输出口 +# 数据类型:fluid(流体连接) + +# 3b. virtual_solenoid_valve (电磁阀门) +# 描述:简单的开关型电磁阀,只有开启和关闭两个状态 +# 连接特性:1个输入口 + 1个输出口,控制通断 +# 数据类型:fluid(流体连接) + +# 4. virtual_centrifuge - 虚拟离心机 +# 描述:单个样品处理设备,原地处理样品 +# 连接特性:1个输入口 + 1个输出口 +# 数据类型:resource(资源/样品连接) + +# 5. virtual_filter - 虚拟过滤器 +# 描述:分离设备,将样品分离为滤液和滤渣 +# 连接特性:1个输入口 + 2个输出口(滤液和滤渣) +# 数据类型:resource(资源/样品连接) + +# 6. virtual_heatchill - 虚拟加热/冷却器 +# 描述:温控设备,容器直接放置在设备上进行温度控制 +# 连接特性:1个双向连接点(bidirectional) +# 数据类型:mechanical(机械/物理接触连接) + +# 7. virtual_transfer_pump - 虚拟转移泵(注射器式) +# 描述:注射器式转移泵,通过同一个口吸入和排出液体 +# 连接特性:1个双向连接点(bidirectional) +# 数据类型:fluid(流体连接) + +# 8. virtual_column - 虚拟色谱柱 +# 描述:分离纯化设备,用于样品纯化 +# 连接特性:1个输入口 + 1个输出口 +# 数据类型:resource(资源/样品连接) virtual_pump: description: Virtual Pump for PumpTransferProtocol Testing class: @@ -124,10 +170,10 @@ virtual_stirrer: default: 1000.0 additionalProperties: false -virtual_valve: - description: Virtual Valve for AddProtocol Testing +virtual_multiway_valve: + description: Virtual 8-Way Valve for flow direction control class: - module: unilabos.devices.virtual.virtual_valve:VirtualValve + module: unilabos.devices.virtual.virtual_multiway_valve:VirtualMultiwayValve type: python status_types: status: String @@ -139,10 +185,90 @@ virtual_valve: set_position: type: SendCmd goal: - command: position + position: position feedback: {} result: success: success + # 八通阀门节点配置 - 1个输入口,8个输出口,可切换流向 + handles: + input: + - handler_key: multiway-valve-inlet + label: Valve Inlet + data_type: fluid + io_type: target + data_source: handle + data_key: fluid_in + description: "八通阀门进液口,接收来源流体" + output: + - handler_key: multiway-valve-port-1 + label: Port 1 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_1 + description: "八通阀门端口1,position=1时流体从此口流出" + - handler_key: multiway-valve-port-2 + label: Port 2 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_2 + description: "八通阀门端口2,position=2时流体从此口流出" + - handler_key: multiway-valve-port-3 + label: Port 3 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_3 + description: "八通阀门端口3,position=3时流体从此口流出" + - handler_key: multiway-valve-port-4 + label: Port 4 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_4 + description: "八通阀门端口4,position=4时流体从此口流出" + - handler_key: multiway-valve-port-5 + label: Port 5 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_5 + description: "八通阀门端口5,position=5时流体从此口流出" + - handler_key: multiway-valve-port-6 + label: Port 6 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_6 + description: "八通阀门端口6,position=6时流体从此口流出" + - handler_key: multiway-valve-port-7 + label: Port 7 + data_type: fluid + io_type: source + data_source: executor + data_key: fluid_port_7 + description: "八通阀门端口7,position=7时流体从此口流出" + schema: + type: object + properties: + port: + type: string + default: "VIRTUAL" + positions: + type: integer + default: 8 + additionalProperties: false +virtual_solenoid_valve: + description: Virtual Solenoid Valve for simple on/off flow control + class: + module: unilabos.devices.virtual.virtual_solenoid_valve:VirtualSolenoidValve + type: python + status_types: + status: String + valve_state: String # "open" or "closed" + is_open: Boolean + action_value_mappings: open: type: EmptyIn goal: {} @@ -155,77 +281,36 @@ virtual_valve: feedback: {} result: success: success - # 虚拟阀门节点配置 - 6通阀门,1个输入口,6个输出口,可切换流向 + set_state: + type: BoolSingleInput + goal: + state: state + feedback: {} + result: + success: success + # 电磁阀门节点配置 - 双向流通的开关型阀门,流动方向由泵决定 handles: - input: - - handler_key: valve-inlet - label: Valve Inlet + bidirectional: + - handler_key: solenoid-valve-port + label: Valve Port data_type: fluid - io_type: target + io_type: bidirectional data_source: handle - data_key: fluid_in - description: "阀门进液口,接收来源流体" - output: - - handler_key: valve-port-1 - label: Port 1 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_1 - description: "阀门端口1,position=1时流体从此口流出" - - handler_key: valve-port-2 - label: Port 2 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_2 - description: "阀门端口2,position=2时流体从此口流出" - - handler_key: valve-port-3 - label: Port 3 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_3 - description: "阀门端口3,position=3时流体从此口流出" - - handler_key: valve-port-4 - label: Port 4 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_4 - description: "阀门端口4,position=4时流体从此口流出" - - handler_key: valve-port-5 - label: Port 5 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_5 - description: "阀门端口5,position=5时流体从此口流出" - - handler_key: valve-port-6 - label: Port 6 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_6 - description: "阀门端口6,position=6时流体从此口流出" - - handler_key: valve-port-7 - label: Port 7 - data_type: fluid - io_type: source - data_source: executor - data_key: fluid_port_6 - description: "阀门端口7,position=6时流体从此口流出" + data_key: fluid_port + description: "电磁阀的双向流体口,开启时允许流体双向通过,关闭时完全阻断" schema: type: object properties: port: type: string default: "VIRTUAL" - positions: - type: integer - default: 7 + voltage: + type: number + default: 12.0 + response_time: + type: number + default: 0.1 additionalProperties: false - virtual_centrifuge: description: Virtual Centrifuge for CentrifugeProtocol Testing class: