mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 13:01:12 +00:00
bugfixes on organic protocols
This commit is contained in:
@@ -244,7 +244,7 @@ def generate_filter_protocol(
|
||||
# === 收集滤液(如果需要)===
|
||||
debug_print("📍 步骤5: 收集滤液... 💧")
|
||||
|
||||
if filtrate_vessel:
|
||||
if filtrate_vessel_id and filtrate_vessel_id not in G.neighbors(filter_device):
|
||||
debug_print(f" 🧪 收集滤液: {filter_device} → {filtrate_vessel_id} 💧")
|
||||
|
||||
try:
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -125,6 +125,29 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str:
|
||||
"""
|
||||
debug_print(f"🔍 正在查找溶剂 '{solvent}' 的容器... 🧪")
|
||||
|
||||
# 第四步:通过数据中的试剂信息匹配
|
||||
debug_print(" 🧪 步骤1: 数据试剂信息匹配...")
|
||||
for node_id in G.nodes():
|
||||
debug_print(f"查找 id {node_id}, type={G.nodes[node_id].get('type')}, data={G.nodes[node_id].get('data', {})} 的容器...")
|
||||
if G.nodes[node_id].get('type') == 'container':
|
||||
vessel_data = G.nodes[node_id].get('data', {})
|
||||
|
||||
# 检查 data 中的 reagent_name 字段
|
||||
reagent_name = vessel_data.get('reagent_name', '').lower()
|
||||
if reagent_name and solvent.lower() == reagent_name:
|
||||
debug_print(f" 🎉 通过data.reagent_name匹配找到容器: {node_id} (试剂: {reagent_name}) ✨")
|
||||
return node_id
|
||||
|
||||
# 检查 data 中的液体信息
|
||||
liquids = vessel_data.get('liquid', []) or vessel_data.get('liquids', [])
|
||||
for liquid in liquids:
|
||||
if isinstance(liquid, dict):
|
||||
liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower()
|
||||
|
||||
if solvent.lower() == liquid_type or solvent.lower() in liquid_type:
|
||||
debug_print(f" 🎉 通过液体类型匹配找到容器: {node_id} (液体类型: {liquid_type}) ✨")
|
||||
return node_id
|
||||
|
||||
# 构建可能的容器名称
|
||||
possible_names = [
|
||||
f"flask_{solvent}",
|
||||
@@ -140,14 +163,14 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str:
|
||||
debug_print(f"📋 候选容器名称: {possible_names[:3]}... (共{len(possible_names)}个) 📝")
|
||||
|
||||
# 第一步:通过容器名称匹配
|
||||
debug_print(" 🎯 步骤1: 精确名称匹配...")
|
||||
debug_print(" 🎯 步骤2: 精确名称匹配...")
|
||||
for vessel_name in possible_names:
|
||||
if vessel_name in G.nodes():
|
||||
debug_print(f" 🎉 通过名称匹配找到容器: {vessel_name} ✨")
|
||||
return vessel_name
|
||||
|
||||
# 第二步:通过模糊匹配(节点ID和名称)
|
||||
debug_print(" 🔍 步骤2: 模糊名称匹配...")
|
||||
debug_print(" 🔍 步骤3: 模糊名称匹配...")
|
||||
for node_id in G.nodes():
|
||||
if G.nodes[node_id].get('type') == 'container':
|
||||
node_name = G.nodes[node_id].get('name', '').lower()
|
||||
@@ -157,7 +180,7 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str:
|
||||
return node_id
|
||||
|
||||
# 第三步:通过配置中的试剂信息匹配
|
||||
debug_print(" 🧪 步骤3: 配置试剂信息匹配...")
|
||||
debug_print(" 🧪 步骤4: 配置试剂信息匹配...")
|
||||
for node_id in G.nodes():
|
||||
if G.nodes[node_id].get('type') == 'container':
|
||||
# 检查 config 中的 reagent 字段
|
||||
@@ -168,28 +191,6 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str:
|
||||
debug_print(f" 🎉 通过config.reagent匹配找到容器: {node_id} (试剂: {config_reagent}) ✨")
|
||||
return node_id
|
||||
|
||||
# 第四步:通过数据中的试剂信息匹配
|
||||
debug_print(" 🧪 步骤4: 数据试剂信息匹配...")
|
||||
for node_id in G.nodes():
|
||||
if G.nodes[node_id].get('type') == 'container':
|
||||
vessel_data = G.nodes[node_id].get('data', {})
|
||||
|
||||
# 检查 data 中的 reagent_name 字段
|
||||
reagent_name = vessel_data.get('reagent_name', '').lower()
|
||||
if reagent_name and solvent.lower() == reagent_name:
|
||||
debug_print(f" 🎉 通过data.reagent_name匹配找到容器: {node_id} (试剂: {reagent_name}) ✨")
|
||||
return node_id
|
||||
|
||||
# 检查 data 中的液体信息
|
||||
liquids = vessel_data.get('liquid', [])
|
||||
for liquid in liquids:
|
||||
if isinstance(liquid, dict):
|
||||
liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower()
|
||||
|
||||
if solvent.lower() in liquid_type:
|
||||
debug_print(f" 🎉 通过液体类型匹配找到容器: {node_id} (液体类型: {liquid_type}) ✨")
|
||||
return node_id
|
||||
|
||||
# 第五步:部分匹配(如果前面都没找到)
|
||||
debug_print(" 🔍 步骤5: 部分匹配...")
|
||||
for node_id in G.nodes():
|
||||
|
||||
@@ -21,19 +21,6 @@ class VirtualMultiwayValve:
|
||||
self._current_position = 0 # 默认在0号位(transfer pump位置)
|
||||
self._target_position = 0
|
||||
|
||||
# 位置映射说明
|
||||
self.position_map = {
|
||||
0: "transfer_pump", # 0号位连接转移泵
|
||||
1: "port_1", # 1号位
|
||||
2: "port_2", # 2号位
|
||||
3: "port_3", # 3号位
|
||||
4: "port_4", # 4号位
|
||||
5: "port_5", # 5号位
|
||||
6: "port_6", # 6号位
|
||||
7: "port_7", # 7号位
|
||||
8: "port_8" # 8号位
|
||||
}
|
||||
|
||||
print(f"🔄 === 虚拟多通阀门已创建 === ✨")
|
||||
print(f"🎯 端口: {port} | 📊 位置范围: 0-{self.max_positions} | 🏠 初始位置: 0 (transfer_pump)")
|
||||
self.logger.info(f"🔧 多通阀门初始化: 端口={port}, 最大位置={self.max_positions}")
|
||||
@@ -60,7 +47,7 @@ class VirtualMultiwayValve:
|
||||
|
||||
def get_current_port(self) -> str:
|
||||
"""获取当前连接的端口名称 🔌"""
|
||||
return self.position_map.get(self._current_position, "unknown")
|
||||
return self._current_position
|
||||
|
||||
def set_position(self, command: Union[int, str]):
|
||||
"""
|
||||
@@ -115,7 +102,7 @@ class VirtualMultiwayValve:
|
||||
old_position = self._current_position
|
||||
old_port = self.get_current_port()
|
||||
|
||||
self.logger.info(f"🔄 阀门切换: {old_position}({old_port}) → {pos}({self.position_map.get(pos, 'unknown')}) {pos_emoji}")
|
||||
self.logger.info(f"🔄 阀门切换: {old_position}({old_port}) → {pos} {pos_emoji}")
|
||||
|
||||
self._status = "Busy"
|
||||
self._valve_state = "Moving"
|
||||
@@ -190,6 +177,17 @@ class VirtualMultiwayValve:
|
||||
"""获取阀门位置 - 兼容性方法 📍"""
|
||||
return self._current_position
|
||||
|
||||
def set_valve_position(self, command: Union[int, str]):
|
||||
"""
|
||||
设置阀门位置 - 兼容pump_protocol调用 🎯
|
||||
这是set_position的别名方法,用于兼容pump_protocol.py
|
||||
|
||||
Args:
|
||||
command: 目标位置 (0-8) 或位置字符串
|
||||
"""
|
||||
# 删除debug日志:self.logger.debug(f"🎯 兼容性调用: set_valve_position({command})")
|
||||
return self.set_position(command)
|
||||
|
||||
def is_at_position(self, position: int) -> bool:
|
||||
"""检查是否在指定位置 🎯"""
|
||||
result = self._current_position == position
|
||||
@@ -210,17 +208,6 @@ class VirtualMultiwayValve:
|
||||
# 删除debug日志:self.logger.debug(f"🔌 端口{port_number}检查: {port_status} (当前位置: {self._current_position})")
|
||||
return result
|
||||
|
||||
def get_available_positions(self) -> list:
|
||||
"""获取可用位置列表 📋"""
|
||||
positions = list(range(0, self.max_positions + 1))
|
||||
# 删除debug日志:self.logger.debug(f"📋 可用位置: {positions}")
|
||||
return positions
|
||||
|
||||
def get_available_ports(self) -> Dict[int, str]:
|
||||
"""获取可用端口映射 🗺️"""
|
||||
# 删除debug日志:self.logger.debug(f"🗺️ 端口映射: {self.position_map}")
|
||||
return self.position_map.copy()
|
||||
|
||||
def reset(self):
|
||||
"""重置阀门到transfer pump位置(0号位)🔄"""
|
||||
self.logger.info(f"🔄 重置阀门到泵位置...")
|
||||
@@ -259,17 +246,6 @@ class VirtualMultiwayValve:
|
||||
|
||||
return f"🔄 VirtualMultiwayValve({status_emoji} 位置: {self._current_position}/{self.max_positions}, 端口: {current_port}, 状态: {self._status})"
|
||||
|
||||
def set_valve_position(self, command: Union[int, str]):
|
||||
"""
|
||||
设置阀门位置 - 兼容pump_protocol调用 🎯
|
||||
这是set_position的别名方法,用于兼容pump_protocol.py
|
||||
|
||||
Args:
|
||||
command: 目标位置 (0-8) 或位置字符串
|
||||
"""
|
||||
# 删除debug日志:self.logger.debug(f"🎯 兼容性调用: set_valve_position({command})")
|
||||
return self.set_position(command)
|
||||
|
||||
|
||||
# 使用示例
|
||||
if __name__ == "__main__":
|
||||
@@ -291,10 +267,6 @@ if __name__ == "__main__":
|
||||
print(f"\n🔌 切换到2号位: {valve.set_to_port(2)}")
|
||||
print(f"📍 当前状态: {valve}")
|
||||
|
||||
# 显示所有可用位置
|
||||
print(f"\n📋 可用位置: {valve.get_available_positions()}")
|
||||
print(f"🗺️ 端口映射: {valve.get_available_ports()}")
|
||||
|
||||
# 测试切换功能
|
||||
print(f"\n🔄 智能切换测试:")
|
||||
print(f"当前位置: {valve._current_position}")
|
||||
|
||||
@@ -99,8 +99,8 @@ class VirtualRotavap:
|
||||
self.logger.error(f"❌ 时间参数类型无效: {type(time)},使用默认值180.0秒")
|
||||
time = 180.0
|
||||
|
||||
# 确保time是float类型
|
||||
time = float(time)
|
||||
# 确保time是float类型; 并加速
|
||||
time = float(time) / 10.0
|
||||
|
||||
# 🔧 简化处理:如果vessel就是设备自己,直接操作
|
||||
if vessel == self.device_id:
|
||||
|
||||
@@ -48,20 +48,6 @@ class VirtualSolenoidValve:
|
||||
"""获取阀门位置状态"""
|
||||
return "OPEN" if self._is_open else "CLOSED"
|
||||
|
||||
@property
|
||||
def state(self) -> dict:
|
||||
"""获取阀门完整状态"""
|
||||
return {
|
||||
"device_id": self.device_id,
|
||||
"port": self.port,
|
||||
"voltage": self.voltage,
|
||||
"response_time": self.response_time,
|
||||
"is_open": self._is_open,
|
||||
"valve_state": self._valve_state,
|
||||
"status": self._status,
|
||||
"position": self.valve_position
|
||||
}
|
||||
|
||||
async def set_valve_position(self, command: str = None, **kwargs):
|
||||
"""
|
||||
设置阀门位置 - ROS动作接口
|
||||
|
||||
@@ -2161,8 +2161,6 @@ virtual_multiway_valve:
|
||||
type: SendCmd
|
||||
module: unilabos.devices.virtual.virtual_multiway_valve:VirtualMultiwayValve
|
||||
status_types:
|
||||
available_ports: dict
|
||||
available_positions: list
|
||||
current_port: str
|
||||
current_position: int
|
||||
flow_path: str
|
||||
@@ -2268,10 +2266,6 @@ virtual_multiway_valve:
|
||||
type: object
|
||||
data:
|
||||
properties:
|
||||
available_ports:
|
||||
type: object
|
||||
available_positions:
|
||||
type: array
|
||||
current_port:
|
||||
type: string
|
||||
current_position:
|
||||
@@ -2293,8 +2287,6 @@ virtual_multiway_valve:
|
||||
- target_position
|
||||
- current_port
|
||||
- valve_position
|
||||
- available_positions
|
||||
- available_ports
|
||||
- flow_path
|
||||
type: object
|
||||
version: 1.0.0
|
||||
@@ -3775,7 +3767,6 @@ virtual_solenoid_valve:
|
||||
module: unilabos.devices.virtual.virtual_solenoid_valve:VirtualSolenoidValve
|
||||
status_types:
|
||||
is_open: bool
|
||||
state: dict
|
||||
status: str
|
||||
valve_position: str
|
||||
valve_state: str
|
||||
@@ -3813,8 +3804,6 @@ virtual_solenoid_valve:
|
||||
properties:
|
||||
is_open:
|
||||
type: boolean
|
||||
state:
|
||||
type: object
|
||||
status:
|
||||
type: string
|
||||
valve_position:
|
||||
@@ -3826,7 +3815,6 @@ virtual_solenoid_valve:
|
||||
- valve_state
|
||||
- is_open
|
||||
- valve_position
|
||||
- state
|
||||
type: object
|
||||
version: 1.0.0
|
||||
virtual_solid_dispenser:
|
||||
|
||||
@@ -395,6 +395,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
if "data" not in resource:
|
||||
resource["data"] = {}
|
||||
resource["data"].update(json.loads(container_instance.data))
|
||||
request.resources[0].name = resource["name"]
|
||||
logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict")
|
||||
else:
|
||||
logger.info(f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}")
|
||||
|
||||
@@ -404,13 +404,14 @@ class HostNode(BaseROS2DeviceNode):
|
||||
class_name: str,
|
||||
parent: str,
|
||||
bind_locations: Point,
|
||||
liquid_input_slot: list[int],
|
||||
liquid_type: list[str],
|
||||
liquid_volume: list[int],
|
||||
slot_on_deck: str,
|
||||
liquid_input_slot: list[int] = [],
|
||||
liquid_type: list[str] = [],
|
||||
liquid_volume: list[int] = [],
|
||||
slot_on_deck: str = "",
|
||||
):
|
||||
# 暂不支持多对同名父子同时存在
|
||||
res_creation_input = {
|
||||
"id": res_id.split("/")[-1],
|
||||
"name": res_id.split("/")[-1],
|
||||
"class": class_name,
|
||||
"parent": parent.split("/")[-1],
|
||||
@@ -424,8 +425,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
res_creation_input.update(
|
||||
{
|
||||
"data": {
|
||||
"liquids": [{
|
||||
"liquid_type": liquid_type[0] if liquid_type else None,
|
||||
"liquid_volume": liquid_volume[0] if liquid_volume else None,
|
||||
}]
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user