Files
Uni-Lab-OS/unilabos/devices/virtual/virtual_multiway_valve.py
Kongchang Feng acf7b6d3f7 修复了部分的protocol因为XDL更新导致的问题 (#61)
* 修复了部分的protocol因为XDL更新导致的问题

但是pumptransfer,add,dissolve,separate还没修,后续还需要写virtual固体加料器

* 补充了四个action

* 添加了固体加样器,丰富了json,修改了add protocol

* bump version to 0.9.9

* fix bugs from new actions

* protocol完整修复版本& bump version to 0.9.10

* 修补了一些单位处理,bump version to 0.9.11

* 优化了全protocol的运行时间,除了pumptransfer相关的还没

* 补充了剩下的几个protocol

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>
Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>
2025-07-16 11:12:02 +08:00

306 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import logging
from typing import Union, Dict, Optional
class VirtualMultiwayValve:
"""
虚拟九通阀门 - 0号位连接transfer pump1-8号位连接其他设备 🔄
"""
def __init__(self, port: str = "VIRTUAL", positions: int = 8):
self.port = port
self.max_positions = positions # 1-8号位
self.total_positions = positions + 1 # 0-8号位共9个位置
# 添加日志记录器
self.logger = logging.getLogger(f"VirtualMultiwayValve.{port}")
# 状态属性
self._status = "Idle"
self._valve_state = "Ready"
self._current_position = 0 # 默认在0号位transfer pump位置
self._target_position = 0
# 位置映射说明
self.position_map = {
0: "transfer_pump", # 0号位连接转移泵
1: "port_1", # 1号位
2: "port_2", # 2号位
3: "port_3", # 3号位
4: "port_4", # 4号位
5: "port_5", # 5号位
6: "port_6", # 6号位
7: "port_7", # 7号位
8: "port_8" # 8号位
}
print(f"🔄 === 虚拟多通阀门已创建 === ✨")
print(f"🎯 端口: {port} | 📊 位置范围: 0-{self.max_positions} | 🏠 初始位置: 0 (transfer_pump)")
self.logger.info(f"🔧 多通阀门初始化: 端口={port}, 最大位置={self.max_positions}")
@property
def status(self) -> str:
return self._status
@property
def valve_state(self) -> str:
return self._valve_state
@property
def current_position(self) -> int:
return self._current_position
@property
def target_position(self) -> int:
return self._target_position
def get_current_position(self) -> int:
"""获取当前阀门位置 📍"""
return self._current_position
def get_current_port(self) -> str:
"""获取当前连接的端口名称 🔌"""
return self.position_map.get(self._current_position, "unknown")
def set_position(self, command: Union[int, str]):
"""
设置阀门位置 - 支持0-8位置 🎯
Args:
command: 目标位置 (0-8) 或位置字符串
0: transfer pump位置
1-8: 其他设备位置
"""
try:
# 如果是字符串形式的位置,先转换为数字
if isinstance(command, str):
pos = int(command)
else:
pos = int(command)
if pos < 0 or pos > self.max_positions:
error_msg = f"位置必须在 0-{self.max_positions} 范围内"
self.logger.error(f"{error_msg}: 请求位置={pos}")
raise ValueError(error_msg)
# 获取位置描述emoji
if pos == 0:
pos_emoji = "🚰"
pos_desc = "泵位置"
else:
pos_emoji = "🔌"
pos_desc = f"端口{pos}"
old_position = self._current_position
old_port = self.get_current_port()
self.logger.info(f"🔄 阀门切换: {old_position}({old_port}) → {pos}({self.position_map.get(pos, 'unknown')}) {pos_emoji}")
self._status = "Busy"
self._valve_state = "Moving"
self._target_position = pos
# 模拟阀门切换时间
switch_time = abs(self._current_position - pos) * 0.5 # 每个位置0.5秒
if switch_time > 0:
self.logger.info(f"⏱️ 阀门移动中... 预计用时: {switch_time:.1f}秒 🔄")
time.sleep(switch_time)
self._current_position = pos
self._status = "Idle"
self._valve_state = "Ready"
current_port = self.get_current_port()
success_msg = f"✅ 阀门已切换到位置 {pos} ({current_port}) {pos_emoji}"
self.logger.info(success_msg)
return success_msg
except ValueError as e:
error_msg = f"❌ 阀门切换失败: {str(e)}"
self._status = "Error"
self._valve_state = "Error"
self.logger.error(error_msg)
return error_msg
def set_to_pump_position(self):
"""切换到transfer pump位置0号位🚰"""
self.logger.info(f"🚰 切换到泵位置...")
return self.set_position(0)
def set_to_port(self, port_number: int):
"""
切换到指定端口位置 🔌
Args:
port_number: 端口号 (1-8)
"""
if port_number < 1 or port_number > self.max_positions:
error_msg = f"端口号必须在 1-{self.max_positions} 范围内"
self.logger.error(f"{error_msg}: 请求端口={port_number}")
raise ValueError(error_msg)
self.logger.info(f"🔌 切换到端口 {port_number}...")
return self.set_position(port_number)
def open(self):
"""打开阀门 - 设置到transfer pump位置0号位🔓"""
self.logger.info(f"🔓 打开阀门,设置到泵位置...")
return self.set_to_pump_position()
def close(self):
"""关闭阀门 - 对于多通阀门,设置到一个"关闭"状态 🔒"""
self.logger.info(f"🔒 关闭阀门...")
self._status = "Busy"
self._valve_state = "Closing"
time.sleep(0.5)
# 可以选择保持当前位置或设置特殊关闭状态
self._status = "Idle"
self._valve_state = "Closed"
close_msg = f"🔒 阀门已关闭,保持在位置 {self._current_position} ({self.get_current_port()})"
self.logger.info(close_msg)
return close_msg
def get_valve_position(self) -> int:
"""获取阀门位置 - 兼容性方法 📍"""
return self._current_position
def is_at_position(self, position: int) -> bool:
"""检查是否在指定位置 🎯"""
result = self._current_position == position
# 删除debug日志self.logger.debug(f"🎯 位置检查: 当前={self._current_position}, 目标={position}, 匹配={result}")
return result
def is_at_pump_position(self) -> bool:
"""检查是否在transfer pump位置 🚰"""
result = self._current_position == 0
# 删除debug日志pump_status = "是" if result else "否"
# 删除debug日志self.logger.debug(f"🚰 泵位置检查: {pump_status} (当前位置: {self._current_position})")
return result
def is_at_port(self, port_number: int) -> bool:
"""检查是否在指定端口位置 🔌"""
result = self._current_position == port_number
# 删除debug日志port_status = "是" if result else "否"
# 删除debug日志self.logger.debug(f"🔌 端口{port_number}检查: {port_status} (当前位置: {self._current_position})")
return result
def get_available_positions(self) -> list:
"""获取可用位置列表 📋"""
positions = list(range(0, self.max_positions + 1))
# 删除debug日志self.logger.debug(f"📋 可用位置: {positions}")
return positions
def get_available_ports(self) -> Dict[int, str]:
"""获取可用端口映射 🗺️"""
# 删除debug日志self.logger.debug(f"🗺️ 端口映射: {self.position_map}")
return self.position_map.copy()
def reset(self):
"""重置阀门到transfer pump位置0号位🔄"""
self.logger.info(f"🔄 重置阀门到泵位置...")
return self.set_position(0)
def switch_between_pump_and_port(self, port_number: int):
"""
在transfer pump位置和指定端口之间切换 🔄
Args:
port_number: 目标端口号 (1-8)
"""
if self._current_position == 0:
# 当前在pump位置切换到指定端口
self.logger.info(f"🔄 从泵位置切换到端口 {port_number}...")
return self.set_to_port(port_number)
else:
# 当前在某个端口切换到pump位置
self.logger.info(f"🔄 从端口 {self._current_position} 切换到泵位置...")
return self.set_to_pump_position()
def get_flow_path(self) -> str:
"""获取当前流路路径描述 🌊"""
current_port = self.get_current_port()
if self._current_position == 0:
flow_path = f"🚰 转移泵已连接 (位置 {self._current_position})"
else:
flow_path = f"🔌 端口 {self._current_position} 已连接 ({current_port})"
# 删除debug日志self.logger.debug(f"🌊 当前流路: {flow_path}")
return flow_path
def get_info(self) -> dict:
"""获取阀门详细信息 📊"""
info = {
"port": self.port,
"max_positions": self.max_positions,
"total_positions": self.total_positions,
"current_position": self._current_position,
"current_port": self.get_current_port(),
"target_position": self._target_position,
"status": self._status,
"valve_state": self._valve_state,
"flow_path": self.get_flow_path(),
"position_map": self.position_map
}
# 删除debug日志self.logger.debug(f"📊 阀门信息: 位置={self._current_position}, 状态={self._status}, 端口={self.get_current_port()}")
return info
def __str__(self):
current_port = self.get_current_port()
status_emoji = "" if self._status == "Idle" else "🔄" if self._status == "Busy" else ""
return f"🔄 VirtualMultiwayValve({status_emoji} 位置: {self._current_position}/{self.max_positions}, 端口: {current_port}, 状态: {self._status})"
def set_valve_position(self, command: Union[int, str]):
"""
设置阀门位置 - 兼容pump_protocol调用 🎯
这是set_position的别名方法用于兼容pump_protocol.py
Args:
command: 目标位置 (0-8) 或位置字符串
"""
# 删除debug日志self.logger.debug(f"🎯 兼容性调用: set_valve_position({command})")
return self.set_position(command)
# 使用示例
if __name__ == "__main__":
valve = VirtualMultiwayValve()
print("🔄 === 虚拟九通阀门测试 === ✨")
print(f"🏠 初始状态: {valve}")
print(f"🌊 当前流路: {valve.get_flow_path()}")
# 切换到试剂瓶11号位
print(f"\n🔌 切换到1号位: {valve.set_position(1)}")
print(f"📍 当前状态: {valve}")
# 切换到transfer pump位置0号位
print(f"\n🚰 切换到pump位置: {valve.set_to_pump_position()}")
print(f"📍 当前状态: {valve}")
# 切换到试剂瓶22号位
print(f"\n🔌 切换到2号位: {valve.set_to_port(2)}")
print(f"📍 当前状态: {valve}")
# 显示所有可用位置
print(f"\n📋 可用位置: {valve.get_available_positions()}")
print(f"🗺️ 端口映射: {valve.get_available_ports()}")
# 获取详细信息
print(f"\n📊 详细信息: {valve.get_info()}")
# 测试切换功能
print(f"\n🔄 智能切换测试:")
print(f"当前位置: {valve._current_position}")
print(f"切换结果: {valve.switch_between_pump_and_port(3)}")
print(f"新位置: {valve._current_position}")
# 重置测试
print(f"\n🔄 重置测试: {valve.reset()}")
print(f"📍 重置后状态: {valve}")