From ea60cbe891ca2746b9be654c782f6a15b46a1a29 Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Tue, 12 Aug 2025 14:50:01 +0800 Subject: [PATCH] bugfixes on organic protocols --- .../comprehensive_station.json | 88 +- unilabos/compile/filter_protocol.py | 2 +- unilabos/compile/pump_protocol.py | 1502 +++-------------- unilabos/compile/utils/vessel_parser.py | 51 +- .../devices/virtual/virtual_multiway_valve.py | 54 +- unilabos/devices/virtual/virtual_rotavap.py | 4 +- .../devices/virtual/virtual_solenoid_valve.py | 14 - unilabos/registry/devices/virtual_device.yaml | 12 - unilabos/ros/nodes/base_device_node.py | 1 + unilabos/ros/nodes/presets/host_node.py | 11 +- 10 files changed, 378 insertions(+), 1361 deletions(-) diff --git a/test/experiments/comprehensive_protocol/comprehensive_station.json b/test/experiments/comprehensive_protocol/comprehensive_station.json index ed35379b..1da0d1df 100644 --- a/test/experiments/comprehensive_protocol/comprehensive_station.json +++ b/test/experiments/comprehensive_protocol/comprehensive_station.json @@ -170,12 +170,15 @@ "z": 0 }, "config": { - "volume": 1000.0, - "reagent": "DMF" + "max_volume": 1000.0 }, "data": { - "current_volume": 1000.0, - "reagent_name": "DMF" + "liquids": [ + { + "liquid_type": "DMF", + "liquid_volume": 1000.0 + } + ] } }, { @@ -191,12 +194,15 @@ "z": 0 }, "config": { - "volume": 1000.0, - "reagent": "ethyl_acetate" + "max_volume": 1000.0 }, "data": { - "current_volume": 1000.0, - "reagent_name": "ethyl_acetate" + "liquids": [ + { + "liquid_type": "ethyl_acetate", + "liquid_volume": 1000.0 + } + ] } }, { @@ -212,12 +218,15 @@ "z": 0 }, "config": { - "volume": 1000.0, - "reagent": "hexane" + "max_volume": 1000.0 }, "data": { - "current_volume": 1000.0, - "reagent_name": "hexane" + "liquids": [ + { + "liquid_type": "hexane", + "liquid_volume": 1000.0 + } + ] } }, { @@ -233,12 +242,15 @@ "z": 0 }, "config": { - "volume": 1000.0, - "reagent": "methanol" + "max_volume": 1000.0 }, "data": { - "current_volume": 1000.0, - "reagent_name": "methanol" + "liquids": [ + { + "liquid_type": "methanol", + "liquid_volume": 1000.0 + } + ] } }, { @@ -254,12 +266,15 @@ "z": 0 }, "config": { - "volume": 1000.0, - "reagent": "water" + "max_volume": 1000.0 }, "data": { - "current_volume": 1000.0, - "reagent_name": "water" + "liquids": [ + { + "liquid_type": "water", + "liquid_volume": 1000.0 + } + ] } }, { @@ -319,15 +334,15 @@ "z": 0 }, "config": { - "volume": 500.0, + "max_volume": 500.0, "max_temp": 200.0, "min_temp": -20.0, "has_stirrer": true, "has_heater": true }, "data": { - "current_volume": 0.0, - "current_temp": 25.0 + "liquids": [ + ] } }, { @@ -404,10 +419,11 @@ "z": 0 }, "config": { - "volume": 2000.0 + "max_volume": 2000.0 }, "data": { - "current_volume": 0.0 + "liquids": [ + ] } }, { @@ -423,10 +439,11 @@ "z": 0 }, "config": { - "volume": 2000.0 + "max_volume": 2000.0 }, "data": { - "current_volume": 0.0 + "liquids": [ + ] } }, { @@ -632,10 +649,11 @@ "z": 0 }, "config": { - "volume": 250.0 + "max_volume": 250.0 }, "data": { - "current_volume": 0.0 + "liquids": [ + ] } }, { @@ -651,10 +669,11 @@ "z": 0 }, "config": { - "volume": 250.0 + "max_volume": 250.0 }, "data": { - "current_volume": 0.0 + "liquids": [ + ] } }, { @@ -670,10 +689,11 @@ "z": 0 }, "config": { - "volume": 250.0 + "max_volume": 250.0 }, "data": { - "current_volume": 0.0 + "liquids": [ + ] } }, { @@ -712,7 +732,7 @@ "z": 0 }, "config": { - "volume": 500.0, + "max_volume": 500.0, "reagent": "sodium_chloride", "physical_state": "solid" }, diff --git a/unilabos/compile/filter_protocol.py b/unilabos/compile/filter_protocol.py index 8e5efd96..063794df 100644 --- a/unilabos/compile/filter_protocol.py +++ b/unilabos/compile/filter_protocol.py @@ -244,7 +244,7 @@ def generate_filter_protocol( # === 收集滤液(如果需要)=== debug_print("📍 步骤5: 收集滤液... 💧") - if filtrate_vessel: + if filtrate_vessel_id and filtrate_vessel_id not in G.neighbors(filter_device): debug_print(f" 🧪 收集滤液: {filter_device} → {filtrate_vessel_id} 💧") try: diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index f8297f41..7215fc5b 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -1,3 +1,4 @@ +import traceback import numpy as np import networkx as nx import asyncio @@ -6,40 +7,44 @@ from typing import List, Dict, Any import logging import sys +from unilabos.compile.utils.vessel_parser import get_vessel + logger = logging.getLogger(__name__) + def debug_print(message): """强制输出调试信息""" output = f"[TRANSFER] {message}" logger.info(output) + def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: """ 从容器节点的数据中获取液体体积 """ debug_print(f"🔍 开始读取容器 '{vessel}' 的液体体积...") - + if vessel not in G.nodes(): logger.error(f"❌ 容器 '{vessel}' 不存在于系统图中") debug_print(f" - 系统中的容器: {list(G.nodes())}") return 0.0 - + vessel_data = G.nodes[vessel].get('data', {}) debug_print(f"📋 容器 '{vessel}' 的数据结构: {vessel_data}") - + total_volume = 0.0 - + # 方法1:检查 'liquid' 字段(列表格式) debug_print("🔍 方法1: 检查 'liquid' 字段...") if 'liquid' in vessel_data: liquids = vessel_data['liquid'] debug_print(f" - liquid 字段类型: {type(liquids)}") debug_print(f" - liquid 字段内容: {liquids}") - + if isinstance(liquids, list): debug_print(f" - liquid 是列表,包含 {len(liquids)} 个元素") for i, liquid in enumerate(liquids): - debug_print(f" 液体 {i+1}: {liquid}") + debug_print(f" 液体 {i + 1}: {liquid}") if isinstance(liquid, dict): volume_keys = ['liquid_volume', 'volume', 'amount', 'quantity'] for key in volume_keys: @@ -56,7 +61,7 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: debug_print(f" - liquid 不是列表: {type(liquids)}") else: debug_print(" - 没有 'liquid' 字段") - + # 方法2:检查直接的体积字段 debug_print("🔍 方法2: 检查直接体积字段...") volume_keys = ['total_volume', 'volume', 'liquid_volume', 'amount', 'current_volume'] @@ -70,7 +75,7 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: except (ValueError, TypeError) as e: logger.warning(f" ⚠️ 无法转换 '{key}': {vessel_data[key]} -> {str(e)}") continue - + # 方法3:检查 'state' 或 'status' 字段 debug_print("🔍 方法3: 检查 'state' 字段...") if 'state' in vessel_data and isinstance(vessel_data['state'], dict): @@ -85,10 +90,11 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: logger.warning(f" ⚠️ 无法转换 state.volume: {state['volume']} -> {str(e)}") else: debug_print(" - 没有 'state' 字段或不是字典") - + debug_print(f"📊 容器 '{vessel}' 最终检测体积: {total_volume}mL") return total_volume + def is_integrated_pump(node_name): return "pump" in node_name and "valve" in node_name @@ -99,42 +105,42 @@ def find_connected_pump(G, valve_node): 🔧 修复:区分电磁阀和多通阀,电磁阀不参与泵查找 """ debug_print(f"🔍 查找与阀门 {valve_node} 相连的泵...") - + # 🔧 关键修复:检查节点类型,电磁阀不应该查找泵 node_data = G.nodes.get(valve_node, {}) node_class = node_data.get("class", "") or "" - + debug_print(f" - 阀门类型: {node_class}") - + # 如果是电磁阀,不应该查找泵(电磁阀只是开关) if ("solenoid" in node_class.lower() or "solenoid_valve" in valve_node.lower()): debug_print(f" ⚠️ {valve_node} 是电磁阀,不应该查找泵节点") raise ValueError(f"电磁阀 {valve_node} 不应该参与泵查找逻辑") - + # 只有多通阀等复杂阀门才需要查找连接的泵 if ("multiway" in node_class.lower() or "valve" in node_class.lower()): debug_print(f" - {valve_node} 是多通阀,查找连接的泵...") - # 方法1:直接相邻的泵 for neighbor in G.neighbors(valve_node): neighbor_class = G.nodes[neighbor].get("class", "") or "" + # 排除非 电磁阀 和 泵 的邻居 debug_print(f" - 检查邻居 {neighbor}, class: {neighbor_class}") if "pump" in neighbor_class.lower(): debug_print(f" ✅ 找到直接相连的泵: {neighbor}") return neighbor - + # 方法2:通过路径查找泵(最多2跳) debug_print(f" - 未找到直接相连的泵,尝试路径查找...") - + # 获取所有泵节点 pump_nodes = [] for node_id in G.nodes(): node_class = G.nodes[node_id].get("class", "") or "" if "pump" in node_class.lower(): pump_nodes.append(node_id) - + debug_print(f" - 系统中的泵节点: {pump_nodes}") - + # 查找到泵的最短路径 for pump_node in pump_nodes: try: @@ -142,18 +148,13 @@ def find_connected_pump(G, valve_node): path = nx.shortest_path(G, valve_node, pump_node) path_length = len(path) - 1 debug_print(f" - 到泵 {pump_node} 的路径: {path}, 距离: {path_length}") - + if path_length <= 2: # 最多允许2跳 debug_print(f" ✅ 通过路径找到泵: {pump_node}") return pump_node except nx.NetworkXNoPath: continue - - # 方法3:降级方案 - 返回第一个可用的泵 - if pump_nodes: - debug_print(f" ⚠️ 未找到连接的泵,使用第一个可用的泵: {pump_nodes[0]}") - return pump_nodes[0] - + # 最终失败 debug_print(f" ❌ 完全找不到泵节点") raise ValueError(f"未找到与阀 {valve_node} 相连的泵节点") @@ -166,26 +167,26 @@ def build_pump_valve_maps(G, pump_backbone): """ pumps_from_node = {} valve_from_node = {} - + debug_print(f"🔧 构建泵-阀门映射,原始骨架: {pump_backbone}") - + # 🔧 关键修复:过滤掉电磁阀 filtered_backbone = [] for node in pump_backbone: node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" - + # 跳过电磁阀 if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): debug_print(f" - 跳过电磁阀: {node}") continue - + filtered_backbone.append(node) - + debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") - + for node in filtered_backbone: - if is_integrated_pump(node): + if is_integrated_pump(G.nodes[node]["class"]): pumps_from_node[node] = node valve_from_node[node] = node debug_print(f" - 集成泵-阀: {node}") @@ -198,18 +199,18 @@ def build_pump_valve_maps(G, pump_backbone): except ValueError as e: debug_print(f" - 跳过节点 {node}: {str(e)}") continue - + debug_print(f"🔧 最终映射: pumps={pumps_from_node}, valves={valve_from_node}") return pumps_from_node, valve_from_node def generate_pump_protocol( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, ) -> List[Dict[str, Any]]: """ 生成泵操作的动作序列 - 修复版本 @@ -217,54 +218,55 @@ def generate_pump_protocol( """ pump_action_sequence = [] nodes = G.nodes(data=True) - + # 验证输入参数 if volume <= 0: logger.error(f"无效的体积参数: {volume}mL") return pump_action_sequence - + if flowrate <= 0: flowrate = 2.5 logger.warning(f"flowrate <= 0,使用默认值 {flowrate}mL/s") - + if transfer_flowrate <= 0: transfer_flowrate = 0.5 logger.warning(f"transfer_flowrate <= 0,使用默认值 {transfer_flowrate}mL/s") - + # 验证容器存在 - if from_vessel not in G.nodes(): - logger.error(f"源容器 '{from_vessel}' 不存在") + debug_print(f"🔍 验证源容器 '{from_vessel_id}' 和目标容器 '{to_vessel_id}' 是否存在...") + if from_vessel_id not in G.nodes(): + logger.error(f"源容器 '{from_vessel_id}' 不存在") return pump_action_sequence - - if to_vessel not in G.nodes(): - logger.error(f"目标容器 '{to_vessel}' 不存在") + + if to_vessel_id not in G.nodes(): + logger.error(f"目标容器 '{to_vessel_id}' 不存在") return pump_action_sequence - + try: - shortest_path = nx.shortest_path(G, source=from_vessel, target=to_vessel) - debug_print(f"PUMP_TRANSFER: 路径 {from_vessel} -> {to_vessel}: {shortest_path}") + shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) + debug_print(f"PUMP_TRANSFER: 路径 {from_vessel_id} -> {to_vessel_id}: {shortest_path}") except nx.NetworkXNoPath: - logger.error(f"无法找到从 '{from_vessel}' 到 '{to_vessel}' 的路径") + logger.error(f"无法找到从 '{from_vessel_id}' 到 '{to_vessel_id}' 的路径") return pump_action_sequence # 🔧 关键修复:正确构建泵骨架,排除容器和电磁阀 pump_backbone = [] for node in shortest_path: # 跳过起始和结束容器 - if node == from_vessel or node == to_vessel: + if node == from_vessel_id or node == to_vessel_id: continue - + # 跳过电磁阀(电磁阀不参与泵操作) node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): debug_print(f"PUMP_TRANSFER: 跳过电磁阀 {node}") continue - + # 只包含多通阀和泵 if ("multiway" in node_class.lower() or "valve" in node_class.lower() or "pump" in node_class.lower()): pump_backbone.append(node) - + debug_print(f"PUMP_TRANSFER: 过滤后的泵骨架: {pump_backbone}") if not pump_backbone: @@ -295,7 +297,7 @@ def generate_pump_protocol( max_volume = pump_config.get("max_volume") if max_volume is not None: min_transfer_volumes.append(max_volume) - + if min_transfer_volumes: min_transfer_volume = min(min_transfer_volumes) else: @@ -306,8 +308,8 @@ def generate_pump_protocol( min_transfer_volume = 25.0 # 默认值 repeats = int(np.ceil(volume / min_transfer_volume)) - - if repeats > 1 and (from_vessel.startswith("pump") or to_vessel.startswith("pump")): + + if repeats > 1 and (from_vessel_id.startswith("pump") or to_vessel_id.startswith("pump")): logger.error("Cannot transfer volume larger than min_transfer_volume between two pumps.") return pump_action_sequence @@ -323,7 +325,7 @@ def generate_pump_protocol( def create_progress_log_action(message: str) -> Dict[str, Any]: """创建一个特殊的等待动作,在执行时打印进度日志""" return { - "action_name": "wait", + "action_name": "wait", "action_kwargs": { "time": 0.1, # 很短的等待时间 "progress_message": message # 自定义字段,用于进度日志 @@ -333,12 +335,12 @@ def generate_pump_protocol( # 生成泵操作序列 for i in range(repeats): current_volume = min(volume_left, min_transfer_volume) - + # 🆕 在每次循环开始时添加进度日志 if repeats > 1: - start_message = f"🚀 准备开始第 {i+1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel} → {to_vessel}) 🚰" + start_message = f"🚀 准备开始第 {i + 1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel_id} → {to_vessel_id}) 🚰" pump_action_sequence.append(create_progress_log_action(start_message)) - + # 🔧 修复:安全地获取边数据 def get_safe_edge_data(node_a, node_b, key): try: @@ -351,12 +353,12 @@ def generate_pump_protocol( except Exception as e: debug_print(f"PUMP_TRANSFER: 获取边数据失败 {node_a}->{node_b}: {str(e)}") return "default" - + # 从源容器吸液 - if not from_vessel.startswith("pump") and pump_backbone: + if not from_vessel_id.startswith("pump") and pump_backbone: first_pump_node = pump_backbone[0] if first_pump_node in valve_from_node and first_pump_node in pumps_from_node: - port_command = get_safe_edge_data(first_pump_node, from_vessel, first_pump_node) + port_command = get_safe_edge_data(first_pump_node, from_vessel_id, first_pump_node) pump_action_sequence.extend([ { "device_id": valve_from_node[first_pump_node], @@ -375,13 +377,13 @@ def generate_pump_protocol( } ]) pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) - + # 泵间转移 for nodeA, nodeB in zip(pump_backbone[:-1], pump_backbone[1:]): if nodeA in valve_from_node and nodeB in valve_from_node and nodeA in pumps_from_node and nodeB in pumps_from_node: port_a = get_safe_edge_data(nodeA, nodeB, nodeA) port_b = get_safe_edge_data(nodeB, nodeA, nodeB) - + pump_action_sequence.append([ { "device_id": valve_from_node[nodeA], @@ -419,10 +421,10 @@ def generate_pump_protocol( pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) # 排液到目标容器 - if not to_vessel.startswith("pump") and pump_backbone: + if not to_vessel_id.startswith("pump") and pump_backbone: last_pump_node = pump_backbone[-1] if last_pump_node in valve_from_node and last_pump_node in pumps_from_node: - port_command = get_safe_edge_data(last_pump_node, to_vessel, last_pump_node) + port_command = get_safe_edge_data(last_pump_node, to_vessel_id, last_pump_node) pump_action_sequence.extend([ { "device_id": valve_from_node[last_pump_node], @@ -446,70 +448,74 @@ def generate_pump_protocol( if repeats > 1: remaining_volume = volume_left - current_volume if remaining_volume > 0: - end_message = f"✅ 第 {i+1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" + end_message = f"✅ 第 {i + 1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" else: - end_message = f"🎉 第 {i+1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" - + end_message = f"🎉 第 {i + 1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" + pump_action_sequence.append(create_progress_log_action(end_message)) volume_left -= current_volume - + return pump_action_sequence +# 保持原有的同步版本兼容性 def generate_pump_protocol_with_rinsing( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, # 🔧 修复:统一使用 time - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs + G: nx.DiGraph, + from_vessel: dict, + to_vessel: dict, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs ) -> List[Dict[str, Any]]: """ 原有的同步版本,添加防冲突机制 """ - + # 添加执行锁,防止并发调用 import threading if not hasattr(generate_pump_protocol_with_rinsing, '_lock'): generate_pump_protocol_with_rinsing._lock = threading.Lock() - + + from_vessel_id, _ = get_vessel(from_vessel) + to_vessel_id, _ = get_vessel(to_vessel) + with generate_pump_protocol_with_rinsing._lock: debug_print("=" * 60) debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (同步版本)") - debug_print(f" 📍 路径: {from_vessel} -> {to_vessel}") + debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") debug_print(f" 🕐 时间戳: {time_module.time()}") debug_print(f" 🔒 获得执行锁") debug_print("=" * 60) - + # 短暂延迟,避免快速重复调用 time_module.sleep(0.01) - + debug_print("🔍 步骤1: 开始体积处理...") - + # 1. 处理体积参数 final_volume = volume debug_print(f"📋 初始设置: final_volume = {final_volume}") - + # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 if volume == 0.0: debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") - + # 直接从源容器读取实际体积 - actual_volume = get_vessel_liquid_volume(G, from_vessel) - debug_print(f"📖 从容器 '{from_vessel}' 读取到体积: {actual_volume}mL") - + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + debug_print(f"📖 从容器 '{from_vessel_id}' 读取到体积: {actual_volume}mL") + if actual_volume > 0: final_volume = actual_volume debug_print(f"✅ 成功设置体积为: {final_volume}mL") @@ -518,65 +524,65 @@ def generate_pump_protocol_with_rinsing( logger.warning(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") else: debug_print(f"📌 体积非零,直接使用: {final_volume}mL") - + # 处理 amount 参数 if amount and amount.strip(): debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") parsed_volume = _parse_amount_to_volume(amount) debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") - + if parsed_volume > 0: final_volume = parsed_volume debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") elif parsed_volume == 0.0 and amount.lower().strip() == "all": debug_print("🎯 检测到 amount='all',从容器读取全部体积...") - actual_volume = get_vessel_liquid_volume(G, from_vessel) + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) if actual_volume > 0: final_volume = actual_volume debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") - + # 最终体积验证 debug_print(f"🔍 步骤2: 最终体积验证...") if final_volume <= 0: logger.error(f"❌ 体积无效: {final_volume}mL") final_volume = 10.0 logger.warning(f"⚠️ 强制设置为默认值: {final_volume}mL") - + debug_print(f"✅ 最终确定体积: {final_volume}mL") - + # 2. 处理流速参数 debug_print(f"🔍 步骤3: 处理流速参数...") debug_print(f" - 原始 flowrate: {flowrate}") debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") - + final_flowrate = flowrate if flowrate > 0 else 2.5 final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 - + if flowrate <= 0: logger.warning(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") if transfer_flowrate <= 0: logger.warning(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") - + debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") - + # 3. 根据时间计算流速 if time > 0 and final_volume > 0: debug_print(f"🔍 步骤4: 根据时间计算流速...") calculated_flowrate = final_volume / time debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") - + if flowrate <= 0 or flowrate == 2.5: final_flowrate = min(calculated_flowrate, 10.0) debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") if transfer_flowrate <= 0 or transfer_flowrate == 0.5: final_transfer_flowrate = min(calculated_flowrate, 5.0) debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") - + # 4. 根据速度规格调整 if rate_spec: debug_print(f"🔍 步骤5: 根据速度规格调整...") debug_print(f" - 速度规格: '{rate_spec}'") - + if rate_spec == "dropwise": final_flowrate = min(final_flowrate, 0.1) final_transfer_flowrate = min(final_transfer_flowrate, 0.1) @@ -589,545 +595,138 @@ def generate_pump_protocol_with_rinsing( final_flowrate = max(final_flowrate, 5.0) final_transfer_flowrate = max(final_transfer_flowrate, 2.0) debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") - + + # 5. 处理冲洗参数 + debug_print(f"🔍 步骤6: 处理冲洗参数...") + final_rinsing_solvent = rinsing_solvent + final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 + final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 + + if rinsing_volume <= 0: + logger.warning(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") + if rinsing_repeats <= 0: + logger.warning(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") + + # 根据物理属性调整冲洗参数 + if viscous or solid: + final_rinsing_repeats = max(final_rinsing_repeats, 3) + final_rinsing_volume = max(final_rinsing_volume, 10.0) + debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") + + # 参数总结 + debug_print("📊 最终参数总结:") + debug_print(f" - 体积: {final_volume}mL") + debug_print(f" - 流速: {final_flowrate}mL/s") + debug_print(f" - 转移流速: {final_transfer_flowrate}mL/s") + debug_print(f" - 冲洗溶剂: '{final_rinsing_solvent}'") + debug_print(f" - 冲洗体积: {final_rinsing_volume}mL") + debug_print(f" - 冲洗次数: {final_rinsing_repeats}次") + + # ========== 执行基础转移 ========== + + debug_print("🔧 步骤7: 开始执行基础转移...") + + try: + debug_print(f" - 调用 generate_pump_protocol...") + debug_print( + f" - 参数: G, '{from_vessel_id}', '{to_vessel_id}', {final_volume}, {final_flowrate}, {final_transfer_flowrate}") + + pump_action_sequence = generate_pump_protocol( + G, from_vessel_id, to_vessel_id, final_volume, + final_flowrate, final_transfer_flowrate + ) + + debug_print(f" - generate_pump_protocol 返回结果:") + debug_print(f" - 动作序列长度: {len(pump_action_sequence)}") + debug_print(f" - 动作序列是否为空: {len(pump_action_sequence) == 0}") + + if not pump_action_sequence: + debug_print("❌ 基础转移协议生成为空,可能是路径问题") + debug_print(f" - 源容器存在: {from_vessel_id in G.nodes()}") + debug_print(f" - 目标容器存在: {to_vessel_id in G.nodes()}") + + if from_vessel_id in G.nodes() and to_vessel_id in G.nodes(): + try: + path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) + debug_print(f" - 路径存在: {path}") + except Exception as path_error: + debug_print(f" - 无法找到路径: {str(path_error)}") + + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"⚠️ 路径问题,无法转移: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}" + } + } + ] + + debug_print(f"✅ 基础转移生成了 {len(pump_action_sequence)} 个动作") + + # 打印前几个动作用于调试 + if len(pump_action_sequence) > 0: + debug_print("🔍 前几个动作预览:") + for i, action in enumerate(pump_action_sequence[:3]): + debug_print(f" 动作 {i + 1}: {action}") + if len(pump_action_sequence) > 3: + debug_print(f" ... 还有 {len(pump_action_sequence) - 3} 个动作") + + except Exception as e: + debug_print(f"❌ 基础转移失败: {str(e)}") + import traceback + debug_print(f"详细错误: {traceback.format_exc()}") + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"❌ 转移失败: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}, 错误: {str(e)}" + } + } + ] + + # ========== 执行冲洗操作 ========== + + debug_print("🔧 步骤8: 检查冲洗操作...") + + if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: + debug_print(f"🧽 开始冲洗操作,溶剂: '{final_rinsing_solvent}'") + try: - # 🆕 修复:在这里调用带有循环日志的generate_pump_protocol_with_loop_logging函数 - pump_action_sequence = generate_pump_protocol_with_loop_logging( - G, from_vessel, to_vessel, final_volume, - final_flowrate, final_transfer_flowrate - ) - - debug_print(f"🔓 释放执行锁") - return pump_action_sequence - + if final_rinsing_solvent.strip() != "air": + debug_print(" - 执行液体冲洗...") + rinsing_actions = _generate_rinsing_sequence( + G, from_vessel_id, to_vessel_id, final_rinsing_solvent, + final_rinsing_volume, final_rinsing_repeats, + final_flowrate, final_transfer_flowrate + ) + pump_action_sequence.extend(rinsing_actions) + debug_print(f" - 添加了 {len(rinsing_actions)} 个冲洗动作") + else: + debug_print(" - 执行空气冲洗...") + air_rinsing_actions = _generate_air_rinsing_sequence( + G, from_vessel_id, to_vessel_id, final_rinsing_volume, final_rinsing_repeats, + final_flowrate, final_transfer_flowrate + ) + pump_action_sequence.extend(air_rinsing_actions) + debug_print(f" - 添加了 {len(air_rinsing_actions)} 个空气冲洗动作") except Exception as e: - logger.error(f"❌ 协议生成失败: {str(e)}") - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"❌ 协议生成失败: {str(e)}" - } - } - ] - - -def generate_pump_protocol_with_loop_logging( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, -) -> List[Dict[str, Any]]: - """ - 生成泵操作的动作序列 - 带循环日志版本 - 🔧 修复:正确处理包含电磁阀的路径,并在合适时机打印循环日志 - """ - pump_action_sequence = [] - nodes = G.nodes(data=True) - - # 验证输入参数 - if volume <= 0: - logger.error(f"无效的体积参数: {volume}mL") - return pump_action_sequence - - if flowrate <= 0: - flowrate = 2.5 - logger.warning(f"flowrate <= 0,使用默认值 {flowrate}mL/s") - - if transfer_flowrate <= 0: - transfer_flowrate = 0.5 - logger.warning(f"transfer_flowrate <= 0,使用默认值 {transfer_flowrate}mL/s") - - # 验证容器存在 - if from_vessel not in G.nodes(): - logger.error(f"源容器 '{from_vessel}' 不存在") - return pump_action_sequence - - if to_vessel not in G.nodes(): - logger.error(f"目标容器 '{to_vessel}' 不存在") - return pump_action_sequence - - try: - shortest_path = nx.shortest_path(G, source=from_vessel, target=to_vessel) - debug_print(f"PUMP_TRANSFER: 路径 {from_vessel} -> {to_vessel}: {shortest_path}") - except nx.NetworkXNoPath: - logger.error(f"无法找到从 '{from_vessel}' 到 '{to_vessel}' 的路径") - return pump_action_sequence - - # 🔧 关键修复:正确构建泵骨架,排除容器和电磁阀 - pump_backbone = [] - for node in shortest_path: - # 跳过起始和结束容器 - if node == from_vessel or node == to_vessel: - continue - - # 跳过电磁阀(电磁阀不参与泵操作) - node_data = G.nodes.get(node, {}) - node_class = node_data.get("class", "") or "" - if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): - debug_print(f"PUMP_TRANSFER: 跳过电磁阀 {node}") - continue - - # 只包含多通阀和泵 - if ("multiway" in node_class.lower() or "valve" in node_class.lower() or "pump" in node_class.lower()): - pump_backbone.append(node) - - debug_print(f"PUMP_TRANSFER: 过滤后的泵骨架: {pump_backbone}") - - if not pump_backbone: - debug_print("PUMP_TRANSFER: 没有泵骨架节点,可能是直接容器连接或只有电磁阀") - return pump_action_sequence - - if transfer_flowrate == 0: - transfer_flowrate = flowrate - - try: - pumps_from_node, valve_from_node = build_pump_valve_maps(G, pump_backbone) - except Exception as e: - debug_print(f"PUMP_TRANSFER: 构建泵-阀门映射失败: {str(e)}") - return pump_action_sequence - - if not pumps_from_node: - debug_print("PUMP_TRANSFER: 没有可用的泵映射") - return pump_action_sequence - - # 🔧 修复:安全地获取最小转移体积 - try: - min_transfer_volumes = [] - for node in pump_backbone: - if node in pumps_from_node: - pump_node = pumps_from_node[node] - if pump_node in nodes: - pump_config = nodes[pump_node].get("config", {}) - max_volume = pump_config.get("max_volume") - if max_volume is not None: - min_transfer_volumes.append(max_volume) - - if min_transfer_volumes: - min_transfer_volume = min(min_transfer_volumes) - else: - min_transfer_volume = 25.0 # 默认值 - debug_print(f"PUMP_TRANSFER: 无法获取泵的最大体积,使用默认值: {min_transfer_volume}mL") - except Exception as e: - debug_print(f"PUMP_TRANSFER: 获取最小转移体积失败: {str(e)}") - min_transfer_volume = 25.0 # 默认值 - - repeats = int(np.ceil(volume / min_transfer_volume)) - - if repeats > 1 and (from_vessel.startswith("pump") or to_vessel.startswith("pump")): - logger.error("Cannot transfer volume larger than min_transfer_volume between two pumps.") - return pump_action_sequence - - volume_left = volume - debug_print(f"PUMP_TRANSFER: 需要 {repeats} 次转移,单次最大体积 {min_transfer_volume} mL") - - # 🆕 只在开头打印总体概览 - if repeats > 1: - debug_print(f"🔄 分批转移概览: 总体积 {volume:.2f}mL,需要 {repeats} 次转移") - logger.info(f"🔄 分批转移概览: 总体积 {volume:.2f}mL,需要 {repeats} 次转移") - - # 🔧 创建一个自定义的wait动作,用于在执行时打印日志 - def create_progress_log_action(message: str) -> Dict[str, Any]: - """创建一个特殊的等待动作,在执行时打印进度日志""" - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, # 很短的等待时间 - "progress_message": message # 自定义字段,用于进度日志 - } - } - - # 生成泵操作序列 - for i in range(repeats): - current_volume = min(volume_left, min_transfer_volume) - - # 🆕 在每次循环开始时添加进度日志 - if repeats > 1: - start_message = f"🚀 准备开始第 {i+1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel} → {to_vessel}) 🚰" - pump_action_sequence.append(create_progress_log_action(start_message)) - - # 🔧 修复:安全地获取边数据 - def get_safe_edge_data(node_a, node_b, key): - try: - edge_data = G.get_edge_data(node_a, node_b) - if edge_data and "port" in edge_data: - port_data = edge_data["port"] - if isinstance(port_data, dict) and key in port_data: - return port_data[key] - return "default" - except Exception as e: - debug_print(f"PUMP_TRANSFER: 获取边数据失败 {node_a}->{node_b}: {str(e)}") - return "default" - - # 从源容器吸液 - if not from_vessel.startswith("pump") and pump_backbone: - first_pump_node = pump_backbone[0] - if first_pump_node in valve_from_node and first_pump_node in pumps_from_node: - port_command = get_safe_edge_data(first_pump_node, from_vessel, first_pump_node) - pump_action_sequence.extend([ - { - "device_id": valve_from_node[first_pump_node], - "action_name": "set_valve_position", - "action_kwargs": { - "command": port_command - } - }, - { - "device_id": pumps_from_node[first_pump_node], - "action_name": "set_position", - "action_kwargs": { - "position": float(current_volume), - "max_velocity": transfer_flowrate - } - } - ]) - pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) - - # 泵间转移 - for nodeA, nodeB in zip(pump_backbone[:-1], pump_backbone[1:]): - if nodeA in valve_from_node and nodeB in valve_from_node and nodeA in pumps_from_node and nodeB in pumps_from_node: - port_a = get_safe_edge_data(nodeA, nodeB, nodeA) - port_b = get_safe_edge_data(nodeB, nodeA, nodeB) - - pump_action_sequence.append([ - { - "device_id": valve_from_node[nodeA], - "action_name": "set_valve_position", - "action_kwargs": { - "command": port_a - } - }, - { - "device_id": valve_from_node[nodeB], - "action_name": "set_valve_position", - "action_kwargs": { - "command": port_b - } - } - ]) - pump_action_sequence.append([ - { - "device_id": pumps_from_node[nodeA], - "action_name": "set_position", - "action_kwargs": { - "position": 0.0, - "max_velocity": transfer_flowrate - } - }, - { - "device_id": pumps_from_node[nodeB], - "action_name": "set_position", - "action_kwargs": { - "position": float(current_volume), - "max_velocity": transfer_flowrate - } - } - ]) - pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) - - # 排液到目标容器 - if not to_vessel.startswith("pump") and pump_backbone: - last_pump_node = pump_backbone[-1] - if last_pump_node in valve_from_node and last_pump_node in pumps_from_node: - port_command = get_safe_edge_data(last_pump_node, to_vessel, last_pump_node) - pump_action_sequence.extend([ - { - "device_id": valve_from_node[last_pump_node], - "action_name": "set_valve_position", - "action_kwargs": { - "command": port_command - } - }, - { - "device_id": pumps_from_node[last_pump_node], - "action_name": "set_position", - "action_kwargs": { - "position": 0.0, - "max_velocity": flowrate - } - } - ]) - pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) - - # 🆕 在每次循环结束时添加完成日志 - if repeats > 1: - remaining_volume = volume_left - current_volume - if remaining_volume > 0: - end_message = f"✅ 第 {i+1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" - else: - end_message = f"🎉 第 {i+1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" - - pump_action_sequence.append(create_progress_log_action(end_message)) - - volume_left -= current_volume - - return pump_action_sequence - - -def generate_pump_protocol_with_rinsing( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, # 🔧 修复:统一使用 time - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs -) -> List[Dict[str, Any]]: - """ - 增强兼容性的泵转移协议生成器,支持自动体积检测 - """ - debug_print("=" * 60) - debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议") - debug_print(f" 📍 路径: {from_vessel} -> {to_vessel}") - debug_print(f" 🕐 时间戳: {time_module.time()}") - debug_print(f" 📊 原始参数:") - debug_print(f" - volume: {volume} (类型: {type(volume)})") - debug_print(f" - amount: '{amount}'") - debug_print(f" - time: {time}") # 🔧 修复:统一使用 time - debug_print(f" - flowrate: {flowrate}") - debug_print(f" - transfer_flowrate: {transfer_flowrate}") - debug_print(f" - rate_spec: '{rate_spec}'") - debug_print("=" * 60) - - # ========== 🔧 核心修复:智能体积处理 ========== - - debug_print("🔍 步骤1: 开始体积处理...") - - # 1. 处理体积参数 - final_volume = volume - debug_print(f"📋 初始设置: final_volume = {final_volume}") - - # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 - if volume == 0.0: - debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") - - # 直接从源容器读取实际体积 - actual_volume = get_vessel_liquid_volume(G, from_vessel) - debug_print(f"📖 从容器 '{from_vessel}' 读取到体积: {actual_volume}mL") - - if actual_volume > 0: - final_volume = actual_volume - debug_print(f"✅ 成功设置体积为: {final_volume}mL") - else: - final_volume = 10.0 # 如果读取失败,使用默认值 - debug_print(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") + debug_print(f"⚠️ 冲洗操作失败: {str(e)},跳过冲洗") else: - debug_print(f"📌 体积非零,直接使用: {final_volume}mL") - - # 处理 amount 参数 - if amount and amount.strip(): - debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") - parsed_volume = _parse_amount_to_volume(amount) - debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") - - if parsed_volume > 0: - final_volume = parsed_volume - debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") - elif parsed_volume == 0.0 and amount.lower().strip() == "all": - debug_print("🎯 检测到 amount='all',从容器读取全部体积...") - actual_volume = get_vessel_liquid_volume(G, from_vessel) - if actual_volume > 0: - final_volume = actual_volume - debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") - - # 最终体积验证 - debug_print(f"🔍 步骤2: 最终体积验证...") - if final_volume <= 0: - debug_print(f"❌ 体积无效: {final_volume}mL") - final_volume = 10.0 - debug_print(f"⚠️ 强制设置为默认值: {final_volume}mL") - - debug_print(f"✅ 最终确定体积: {final_volume}mL") - - # 2. 处理流速参数 - debug_print(f"🔍 步骤3: 处理流速参数...") - debug_print(f" - 原始 flowrate: {flowrate}") - debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") - - final_flowrate = flowrate if flowrate > 0 else 2.5 - final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 - - if flowrate <= 0: - debug_print(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") - if transfer_flowrate <= 0: - debug_print(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") - - debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") - - # 3. 根据时间计算流速 - if time > 0 and final_volume > 0: # 🔧 修复:统一使用 time - debug_print(f"🔍 步骤4: 根据时间计算流速...") - calculated_flowrate = final_volume / time - debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") - - if flowrate <= 0 or flowrate == 2.5: - final_flowrate = min(calculated_flowrate, 10.0) - debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") - if transfer_flowrate <= 0 or transfer_flowrate == 0.5: - final_transfer_flowrate = min(calculated_flowrate, 5.0) - debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") - - # 4. 根据速度规格调整 - if rate_spec: - debug_print(f"🔍 步骤5: 根据速度规格调整...") - debug_print(f" - 速度规格: '{rate_spec}'") - - if rate_spec == "dropwise": - final_flowrate = min(final_flowrate, 0.1) - final_transfer_flowrate = min(final_transfer_flowrate, 0.1) - debug_print(f" - dropwise模式,流速调整为: {final_flowrate}mL/s") - elif rate_spec == "slowly": - final_flowrate = min(final_flowrate, 0.5) - final_transfer_flowrate = min(final_transfer_flowrate, 0.3) - debug_print(f" - slowly模式,流速调整为: {final_flowrate}mL/s") - elif rate_spec == "quickly": - final_flowrate = max(final_flowrate, 5.0) - final_transfer_flowrate = max(final_transfer_flowrate, 2.0) - debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") - - # # 5. 处理冲洗参数 - # debug_print(f"🔍 步骤6: 处理冲洗参数...") - # final_rinsing_solvent = rinsing_solvent - # final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 - # final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 - - # if rinsing_volume <= 0: - # debug_print(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") - # if rinsing_repeats <= 0: - # debug_print(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") - - # # 根据物理属性调整冲洗参数 - # if viscous or solid: - # final_rinsing_repeats = max(final_rinsing_repeats, 3) - # final_rinsing_volume = max(final_rinsing_volume, 10.0) - # debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") - - # 参数总结 - debug_print("📊 最终参数总结:") - debug_print(f" - 体积: {final_volume}mL") - debug_print(f" - 流速: {final_flowrate}mL/s") - debug_print(f" - 转移流速: {final_transfer_flowrate}mL/s") - # debug_print(f" - 冲洗溶剂: '{final_rinsing_solvent}'") - # debug_print(f" - 冲洗体积: {final_rinsing_volume}mL") - # debug_print(f" - 冲洗次数: {final_rinsing_repeats}次") - - # ========== 执行基础转移 ========== - - debug_print("🔧 步骤7: 开始执行基础转移...") - - try: - debug_print(f" - 调用 generate_pump_protocol...") - debug_print(f" - 参数: G, '{from_vessel}', '{to_vessel}', {final_volume}, {final_flowrate}, {final_transfer_flowrate}") - - pump_action_sequence = generate_pump_protocol( - G, from_vessel, to_vessel, final_volume, - final_flowrate, final_transfer_flowrate - ) - - debug_print(f" - generate_pump_protocol 返回结果:") - debug_print(f" - 动作序列长度: {len(pump_action_sequence)}") - debug_print(f" - 动作序列是否为空: {len(pump_action_sequence) == 0}") - - if not pump_action_sequence: - debug_print("❌ 基础转移协议生成为空,可能是路径问题") - debug_print(f" - 源容器存在: {from_vessel in G.nodes()}") - debug_print(f" - 目标容器存在: {to_vessel in G.nodes()}") - - if from_vessel in G.nodes() and to_vessel in G.nodes(): - try: - path = nx.shortest_path(G, source=from_vessel, target=to_vessel) - debug_print(f" - 路径存在: {path}") - except Exception as path_error: - debug_print(f" - 无法找到路径: {str(path_error)}") - - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"⚠️ 路径问题,无法转移: {final_volume}mL 从 {from_vessel} 到 {to_vessel}" - } - } - ] - - debug_print(f"✅ 基础转移生成了 {len(pump_action_sequence)} 个动作") - - # 打印前几个动作用于调试 - if len(pump_action_sequence) > 0: - debug_print("🔍 前几个动作预览:") - for i, action in enumerate(pump_action_sequence[:3]): - debug_print(f" 动作 {i+1}: {action}") - if len(pump_action_sequence) > 3: - debug_print(f" ... 还有 {len(pump_action_sequence) - 3} 个动作") - - except Exception as e: - debug_print(f"❌ 基础转移失败: {str(e)}") - import traceback - debug_print(f"详细错误: {traceback.format_exc()}") - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"❌ 转移失败: {final_volume}mL 从 {from_vessel} 到 {to_vessel}, 错误: {str(e)}" - } - } - ] - - # ========== 执行冲洗操作 ========== - - # debug_print("🔧 步骤8: 检查冲洗操作...") - - # if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: - # debug_print(f"🧽 开始冲洗操作,溶剂: '{final_rinsing_solvent}'") - - # try: - # if final_rinsing_solvent.strip() != "air": - # debug_print(" - 执行液体冲洗...") - # rinsing_actions = _generate_rinsing_sequence( - # G, from_vessel, to_vessel, final_rinsing_solvent, - # final_rinsing_volume, final_rinsing_repeats, - # final_flowrate, final_transfer_flowrate - # ) - # pump_action_sequence.extend(rinsing_actions) - # debug_print(f" - 添加了 {len(rinsing_actions)} 个冲洗动作") - # else: - # debug_print(" - 执行空气冲洗...") - # air_rinsing_actions = _generate_air_rinsing_sequence( - # G, from_vessel, to_vessel, final_rinsing_volume, final_rinsing_repeats, - # final_flowrate, final_transfer_flowrate - # ) - # pump_action_sequence.extend(air_rinsing_actions) - # debug_print(f" - 添加了 {len(air_rinsing_actions)} 个空气冲洗动作") - # except Exception as e: - # debug_print(f"⚠️ 冲洗操作失败: {str(e)},跳过冲洗") - # else: - # debug_print(f"⏭️ 跳过冲洗操作") - # debug_print(f" - 溶剂: '{final_rinsing_solvent}'") - # debug_print(f" - 次数: {final_rinsing_repeats}") - # debug_print(f" - 条件满足: {bool(final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0)}") - + debug_print(f"⏭️ 跳过冲洗操作") + debug_print(f" - 溶剂: '{final_rinsing_solvent}'") + debug_print(f" - 次数: {final_rinsing_repeats}") + debug_print(f" - 条件满足: {bool(final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0)}") + # ========== 最终结果 ========== - + debug_print("=" * 60) debug_print(f"🎉 PUMP_TRANSFER: 协议生成完成") debug_print(f" 📊 总动作数: {len(pump_action_sequence)}") debug_print(f" 📋 最终体积: {final_volume}mL") - debug_print(f" 🚀 执行路径: {from_vessel} -> {to_vessel}") - + debug_print(f" 🚀 执行路径: {from_vessel_id} -> {to_vessel_id}") + # 最终验证 if len(pump_action_sequence) == 0: debug_print("🚨 协议生成结果为空!这是异常情况") @@ -1140,578 +739,15 @@ def generate_pump_protocol_with_rinsing( } } ] - + debug_print("=" * 60) return pump_action_sequence -async def generate_pump_protocol_with_rinsing_async( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs -) -> List[Dict[str, Any]]: - """ - 异步版本的泵转移协议生成器,避免并发问题 - """ - debug_print("=" * 60) - debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (异步版本)") - debug_print(f" 📍 路径: {from_vessel} -> {to_vessel}") - debug_print(f" 🕐 时间戳: {time_module.time()}") - debug_print("=" * 60) - - # 添加唯一标识符 - protocol_id = f"pump_transfer_{int(time_module.time() * 1000000)}" - debug_print(f"📋 协议ID: {protocol_id}") - - # 调用原有的同步版本 - result = generate_pump_protocol_with_rinsing( - G, from_vessel, to_vessel, volume, amount, time, viscous, - rinsing_solvent, rinsing_volume, rinsing_repeats, solid, - flowrate, transfer_flowrate, rate_spec, event, through, **kwargs - ) - - # 为每个动作添加唯一标识 - for i, action in enumerate(result): - if isinstance(action, dict): - action['_protocol_id'] = protocol_id - action['_action_sequence'] = i - action['_timestamp'] = time_module.time() - - debug_print(f"📊 协议 {protocol_id} 生成完成,共 {len(result)} 个动作") - return result - -# 保持原有的同步版本兼容性 -def generate_pump_protocol_with_rinsing( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs -) -> List[Dict[str, Any]]: - """ - 原有的同步版本,添加防冲突机制 - """ - - # 添加执行锁,防止并发调用 - import threading - if not hasattr(generate_pump_protocol_with_rinsing, '_lock'): - generate_pump_protocol_with_rinsing._lock = threading.Lock() - - with generate_pump_protocol_with_rinsing._lock: - debug_print("=" * 60) - debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (同步版本)") - debug_print(f" 📍 路径: {from_vessel} -> {to_vessel}") - debug_print(f" 🕐 时间戳: {time_module.time()}") - debug_print(f" 🔒 获得执行锁") - debug_print("=" * 60) - - # 短暂延迟,避免快速重复调用 - time_module.sleep(0.01) - - debug_print("🔍 步骤1: 开始体积处理...") - - # 1. 处理体积参数 - final_volume = volume - debug_print(f"📋 初始设置: final_volume = {final_volume}") - - # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 - if volume == 0.0: - debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") - - # 直接从源容器读取实际体积 - actual_volume = get_vessel_liquid_volume(G, from_vessel) - debug_print(f"📖 从容器 '{from_vessel}' 读取到体积: {actual_volume}mL") - - if actual_volume > 0: - final_volume = actual_volume - debug_print(f"✅ 成功设置体积为: {final_volume}mL") - else: - final_volume = 10.0 # 如果读取失败,使用默认值 - logger.warning(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") - else: - debug_print(f"📌 体积非零,直接使用: {final_volume}mL") - - # 处理 amount 参数 - if amount and amount.strip(): - debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") - parsed_volume = _parse_amount_to_volume(amount) - debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") - - if parsed_volume > 0: - final_volume = parsed_volume - debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") - elif parsed_volume == 0.0 and amount.lower().strip() == "all": - debug_print("🎯 检测到 amount='all',从容器读取全部体积...") - actual_volume = get_vessel_liquid_volume(G, from_vessel) - if actual_volume > 0: - final_volume = actual_volume - debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") - - # 最终体积验证 - debug_print(f"🔍 步骤2: 最终体积验证...") - if final_volume <= 0: - logger.error(f"❌ 体积无效: {final_volume}mL") - final_volume = 10.0 - logger.warning(f"⚠️ 强制设置为默认值: {final_volume}mL") - - debug_print(f"✅ 最终确定体积: {final_volume}mL") - - # 2. 处理流速参数 - debug_print(f"🔍 步骤3: 处理流速参数...") - debug_print(f" - 原始 flowrate: {flowrate}") - debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") - - final_flowrate = flowrate if flowrate > 0 else 2.5 - final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 - - if flowrate <= 0: - logger.warning(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") - if transfer_flowrate <= 0: - logger.warning(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") - - debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") - - # 3. 根据时间计算流速 - if time > 0 and final_volume > 0: - debug_print(f"🔍 步骤4: 根据时间计算流速...") - calculated_flowrate = final_volume / time - debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") - - if flowrate <= 0 or flowrate == 2.5: - final_flowrate = min(calculated_flowrate, 10.0) - debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") - if transfer_flowrate <= 0 or transfer_flowrate == 0.5: - final_transfer_flowrate = min(calculated_flowrate, 5.0) - debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") - - # 4. 根据速度规格调整 - if rate_spec: - debug_print(f"🔍 步骤5: 根据速度规格调整...") - debug_print(f" - 速度规格: '{rate_spec}'") - - if rate_spec == "dropwise": - final_flowrate = min(final_flowrate, 0.1) - final_transfer_flowrate = min(final_transfer_flowrate, 0.1) - debug_print(f" - dropwise模式,流速调整为: {final_flowrate}mL/s") - elif rate_spec == "slowly": - final_flowrate = min(final_flowrate, 0.5) - final_transfer_flowrate = min(final_transfer_flowrate, 0.3) - debug_print(f" - slowly模式,流速调整为: {final_flowrate}mL/s") - elif rate_spec == "quickly": - final_flowrate = max(final_flowrate, 5.0) - final_transfer_flowrate = max(final_transfer_flowrate, 2.0) - debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") - - # # 5. 处理冲洗参数 - # debug_print(f"🔍 步骤6: 处理冲洗参数...") - # final_rinsing_solvent = rinsing_solvent - # final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 - # final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 - - # if rinsing_volume <= 0: - # logger.warning(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") - # if rinsing_repeats <= 0: - # logger.warning(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") - - # # 根据物理属性调整冲洗参数 - # if viscous or solid: - # final_rinsing_repeats = max(final_rinsing_repeats, 3) - # final_rinsing_volume = max(final_rinsing_volume, 10.0) - # debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") - - # 参数总结 - debug_print("📊 最终参数总结:") - debug_print(f" - 体积: {final_volume}mL") - debug_print(f" - 流速: {final_flowrate}mL/s") - debug_print(f" - 转移流速: {final_transfer_flowrate}mL/s") - # debug_print(f" - 冲洗溶剂: '{final_rinsing_solvent}'") - # debug_print(f" - 冲洗体积: {final_rinsing_volume}mL") - # debug_print(f" - 冲洗次数: {final_rinsing_repeats}次") - - # ========== 执行基础转移 ========== - - debug_print("🔧 步骤7: 开始执行基础转移...") - - try: - debug_print(f" - 调用 generate_pump_protocol...") - debug_print(f" - 参数: G, '{from_vessel}', '{to_vessel}', {final_volume}, {final_flowrate}, {final_transfer_flowrate}") - - pump_action_sequence = generate_pump_protocol( - G, from_vessel, to_vessel, final_volume, - final_flowrate, final_transfer_flowrate - ) - - debug_print(f" - generate_pump_protocol 返回结果:") - debug_print(f" - 动作序列长度: {len(pump_action_sequence)}") - debug_print(f" - 动作序列是否为空: {len(pump_action_sequence) == 0}") - - if not pump_action_sequence: - debug_print("❌ 基础转移协议生成为空,可能是路径问题") - debug_print(f" - 源容器存在: {from_vessel in G.nodes()}") - debug_print(f" - 目标容器存在: {to_vessel in G.nodes()}") - - if from_vessel in G.nodes() and to_vessel in G.nodes(): - try: - path = nx.shortest_path(G, source=from_vessel, target=to_vessel) - debug_print(f" - 路径存在: {path}") - except Exception as path_error: - debug_print(f" - 无法找到路径: {str(path_error)}") - - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"⚠️ 路径问题,无法转移: {final_volume}mL 从 {from_vessel} 到 {to_vessel}" - } - } - ] - - debug_print(f"✅ 基础转移生成了 {len(pump_action_sequence)} 个动作") - - # 打印前几个动作用于调试 - if len(pump_action_sequence) > 0: - debug_print("🔍 前几个动作预览:") - for i, action in enumerate(pump_action_sequence[:3]): - debug_print(f" 动作 {i+1}: {action}") - if len(pump_action_sequence) > 3: - debug_print(f" ... 还有 {len(pump_action_sequence) - 3} 个动作") - - except Exception as e: - debug_print(f"❌ 基础转移失败: {str(e)}") - import traceback - debug_print(f"详细错误: {traceback.format_exc()}") - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"❌ 转移失败: {final_volume}mL 从 {from_vessel} 到 {to_vessel}, 错误: {str(e)}" - } - } - ] - - # ========== 执行冲洗操作 ========== - - # debug_print("🔧 步骤8: 检查冲洗操作...") - - # if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: - # debug_print(f"🧽 开始冲洗操作,溶剂: '{final_rinsing_solvent}'") - - # try: - # if final_rinsing_solvent.strip() != "air": - # debug_print(" - 执行液体冲洗...") - # rinsing_actions = _generate_rinsing_sequence( - # G, from_vessel, to_vessel, final_rinsing_solvent, - # final_rinsing_volume, final_rinsing_repeats, - # final_flowrate, final_transfer_flowrate - # ) - # pump_action_sequence.extend(rinsing_actions) - # debug_print(f" - 添加了 {len(rinsing_actions)} 个冲洗动作") - # else: - # debug_print(" - 执行空气冲洗...") - # air_rinsing_actions = _generate_air_rinsing_sequence( - # G, from_vessel, to_vessel, final_rinsing_volume, final_rinsing_repeats, - # final_flowrate, final_transfer_flowrate - # ) - # pump_action_sequence.extend(air_rinsing_actions) - # debug_print(f" - 添加了 {len(air_rinsing_actions)} 个空气冲洗动作") - # except Exception as e: - # debug_print(f"⚠️ 冲洗操作失败: {str(e)},跳过冲洗") - # else: - # debug_print(f"⏭️ 跳过冲洗操作") - # debug_print(f" - 溶剂: '{final_rinsing_solvent}'") - # debug_print(f" - 次数: {final_rinsing_repeats}") - # debug_print(f" - 条件满足: {bool(final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0)}") - - # ========== 最终结果 ========== - - debug_print("=" * 60) - debug_print(f"🎉 PUMP_TRANSFER: 协议生成完成") - debug_print(f" 📊 总动作数: {len(pump_action_sequence)}") - debug_print(f" 📋 最终体积: {final_volume}mL") - debug_print(f" 🚀 执行路径: {from_vessel} -> {to_vessel}") - - # 最终验证 - if len(pump_action_sequence) == 0: - debug_print("🚨 协议生成结果为空!这是异常情况") - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"🚨 协议生成失败: 无法生成任何动作序列" - } - } - ] - - debug_print("=" * 60) - return pump_action_sequence - - -async def generate_pump_protocol_with_rinsing_async( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs -) -> List[Dict[str, Any]]: - """ - 异步版本的泵转移协议生成器,避免并发问题 - """ - debug_print("=" * 60) - debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (异步版本)") - debug_print(f" 📍 路径: {from_vessel} -> {to_vessel}") - debug_print(f" 🕐 时间戳: {time_module.time()}") - debug_print("=" * 60) - - # 添加唯一标识符 - protocol_id = f"pump_transfer_{int(time_module.time() * 1000000)}" - debug_print(f"📋 协议ID: {protocol_id}") - - # 调用原有的同步版本 - result = generate_pump_protocol_with_rinsing( - G, from_vessel, to_vessel, volume, amount, time, viscous, - rinsing_solvent, rinsing_volume, rinsing_repeats, solid, - flowrate, transfer_flowrate, rate_spec, event, through, **kwargs - ) - - # 为每个动作添加唯一标识 - for i, action in enumerate(result): - if isinstance(action, dict): - action['_protocol_id'] = protocol_id - action['_action_sequence'] = i - action['_timestamp'] = time_module.time() - - debug_print(f"📊 协议 {protocol_id} 生成完成,共 {len(result)} 个动作") - return result - -# 保持原有的同步版本兼容性 -def generate_pump_protocol_with_rinsing( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs -) -> List[Dict[str, Any]]: - """ - 原有的同步版本,添加防冲突机制 - """ - - # 添加执行锁,防止并发调用 - import threading - if not hasattr(generate_pump_protocol_with_rinsing, '_lock'): - generate_pump_protocol_with_rinsing._lock = threading.Lock() - - with generate_pump_protocol_with_rinsing._lock: - debug_print("=" * 60) - debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (同步版本)") - debug_print(f" 📍 路径: {from_vessel} -> {to_vessel}") - debug_print(f" 🕐 时间戳: {time_module.time()}") - debug_print(f" 🔒 获得执行锁") - debug_print("=" * 60) - - # 短暂延迟,避免快速重复调用 - time_module.sleep(0.01) - - debug_print("🔍 步骤1: 开始体积处理...") - - # 1. 处理体积参数 - final_volume = volume - debug_print(f"📋 初始设置: final_volume = {final_volume}") - - # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 - if volume == 0.0: - debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") - - # 直接从源容器读取实际体积 - actual_volume = get_vessel_liquid_volume(G, from_vessel) - debug_print(f"📖 从容器 '{from_vessel}' 读取到体积: {actual_volume}mL") - - if actual_volume > 0: - final_volume = actual_volume - debug_print(f"✅ 成功设置体积为: {final_volume}mL") - else: - final_volume = 10.0 # 如果读取失败,使用默认值 - logger.warning(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") - else: - debug_print(f"📌 体积非零,直接使用: {final_volume}mL") - - # 处理 amount 参数 - if amount and amount.strip(): - debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") - parsed_volume = _parse_amount_to_volume(amount) - debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") - - if parsed_volume > 0: - final_volume = parsed_volume - debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") - elif parsed_volume == 0.0 and amount.lower().strip() == "all": - debug_print("🎯 检测到 amount='all',从容器读取全部体积...") - actual_volume = get_vessel_liquid_volume(G, from_vessel) - if actual_volume > 0: - final_volume = actual_volume - debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") - - # 最终体积验证 - debug_print(f"🔍 步骤2: 最终体积验证...") - if final_volume <= 0: - logger.error(f"❌ 体积无效: {final_volume}mL") - final_volume = 10.0 - logger.warning(f"⚠️ 强制设置为默认值: {final_volume}mL") - - debug_print(f"✅ 最终确定体积: {final_volume}mL") - - # 2. 处理流速参数 - debug_print(f"🔍 步骤3: 处理流速参数...") - debug_print(f" - 原始 flowrate: {flowrate}") - debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") - - final_flowrate = flowrate if flowrate > 0 else 2.5 - final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 - - if flowrate <= 0: - logger.warning(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") - if transfer_flowrate <= 0: - logger.warning(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") - - debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") - - # 3. 根据时间计算流速 - if time > 0 and final_volume > 0: - debug_print(f"🔍 步骤4: 根据时间计算流速...") - calculated_flowrate = final_volume / time - debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") - - if flowrate <= 0 or flowrate == 2.5: - final_flowrate = min(calculated_flowrate, 10.0) - debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") - if transfer_flowrate <= 0 or transfer_flowrate == 0.5: - final_transfer_flowrate = min(calculated_flowrate, 5.0) - debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") - - # 4. 根据速度规格调整 - if rate_spec: - debug_print(f"🔍 步骤5: 根据速度规格调整...") - debug_print(f" - 速度规格: '{rate_spec}'") - - if rate_spec == "dropwise": - final_flowrate = min(final_flowrate, 0.1) - final_transfer_flowrate = min(final_transfer_flowrate, 0.1) - debug_print(f" - dropwise模式,流速调整为: {final_flowrate}mL/s") - elif rate_spec == "slowly": - final_flowrate = min(final_flowrate, 0.5) - final_transfer_flowrate = min(final_transfer_flowrate, 0.3) - debug_print(f" - slowly模式,流速调整为: {final_flowrate}mL/s") - elif rate_spec == "quickly": - final_flowrate = max(final_flowrate, 5.0) - final_transfer_flowrate = max(final_transfer_flowrate, 2.0) - debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") - - # # 5. 处理冲洗参数 - # debug_print(f"🔍 步骤6: 处理冲洗参数...") - # final_rinsing_solvent = rinsing_solvent - # final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 - # final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 - - # if rinsing_volume <= 0: - # logger.warning(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") - # if rinsing_repeats <= 0: - # logger.warning(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") - - # # 根据物理属性调整冲洗参数 - # if viscous or solid: - # final_rinsing_repeats = max(final_rinsing_repeats, 3) - # final_rinsing_volume = max(final_rinsing_volume, 10.0) - # debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") - - try: - pump_action_sequence = generate_pump_protocol( - G, from_vessel, to_vessel, final_volume, - flowrate, transfer_flowrate - ) - - # 为每个动作添加唯一标识 - # for i, action in enumerate(pump_action_sequence): - # if isinstance(action, dict): - # action['_protocol_id'] = protocol_id - # action['_action_sequence'] = i - # elif isinstance(action, list): - # for j, sub_action in enumerate(action): - # if isinstance(sub_action, dict): - # sub_action['_protocol_id'] = protocol_id - # sub_action['_action_sequence'] = f"{i}_{j}" - # - # debug_print(f"📊 协议 {protocol_id} 生成完成,共 {len(pump_action_sequence)} 个动作") - debug_print(f"🔓 释放执行锁") - return pump_action_sequence - - except Exception as e: - logger.error(f"❌ 协议生成失败: {str(e)}") - return [ - { - "device_id": "system", - "action_name": "log_message", - "action_kwargs": { - "message": f"❌ 协议生成失败: {str(e)}" - } - } - ] - def _parse_amount_to_volume(amount: str) -> float: """解析 amount 字符串为体积""" debug_print(f"🔍 解析 amount: '{amount}'") - + if not amount: debug_print(" - amount 为空,返回 0.0") return 0.0 @@ -1728,7 +764,7 @@ def _parse_amount_to_volume(amount: str) -> float: import re numbers = re.findall(r'[\d.]+', amount) debug_print(f" - 提取到的数字: {numbers}") - + if numbers: volume = float(numbers[0]) debug_print(f" - 基础体积: {volume}") @@ -1753,15 +789,21 @@ def _parse_amount_to_volume(amount: str) -> float: return 0.0 -def _generate_rinsing_sequence(G: nx.DiGraph, from_vessel: str, to_vessel: str, - rinsing_solvent: str, rinsing_volume: float, - rinsing_repeats: int, flowrate: float, - transfer_flowrate: float) -> List[Dict[str, Any]]: +def _generate_rinsing_sequence( + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + rinsing_solvent: str, + rinsing_volume: float, + rinsing_repeats: int, + flowrate: float, + transfer_flowrate: float +) -> List[Dict[str, Any]]: """生成冲洗动作序列""" rinsing_actions = [] try: - shortest_path = nx.shortest_path(G, source=from_vessel, target=to_vessel) + shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) pump_backbone = shortest_path[1:-1] if not pump_backbone: @@ -1791,27 +833,31 @@ def _generate_rinsing_sequence(G: nx.DiGraph, from_vessel: str, to_vessel: str, # 清洗泵系统 rinsing_actions.extend( - generate_pump_protocol(G, solvent_vessel, pump_backbone[0], min_transfer_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, solvent_vessel, pump_backbone[0], min_transfer_volume, flowrate, + transfer_flowrate) ) if len(pump_backbone) > 1: rinsing_actions.extend( - generate_pump_protocol(G, pump_backbone[0], pump_backbone[-1], min_transfer_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, pump_backbone[0], pump_backbone[-1], min_transfer_volume, flowrate, + transfer_flowrate) ) # 排到废液容器 if waste_vessel in G.nodes(): rinsing_actions.extend( - generate_pump_protocol(G, pump_backbone[-1], waste_vessel, min_transfer_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, pump_backbone[-1], waste_vessel, min_transfer_volume, flowrate, + transfer_flowrate) ) # 第一种冲洗溶剂稀释源容器和目标容器 if solvent == rinsing_solvents[0]: rinsing_actions.extend( - generate_pump_protocol(G, solvent_vessel, from_vessel, rinsing_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, solvent_vessel, from_vessel_id, rinsing_volume, flowrate, + transfer_flowrate) ) rinsing_actions.extend( - generate_pump_protocol(G, solvent_vessel, to_vessel, rinsing_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, solvent_vessel, to_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) except Exception as e: @@ -1820,9 +866,9 @@ def _generate_rinsing_sequence(G: nx.DiGraph, from_vessel: str, to_vessel: str, return rinsing_actions -def _generate_air_rinsing_sequence(G: nx.DiGraph, from_vessel: str, to_vessel: str, - rinsing_volume: float, repeats: int, - flowrate: float, transfer_flowrate: float) -> List[Dict[str, Any]]: +def _generate_air_rinsing_sequence(G: nx.DiGraph, from_vessel_id: str, to_vessel_id: str, + rinsing_volume: float, repeats: int, + flowrate: float, transfer_flowrate: float) -> List[Dict[str, Any]]: """生成空气冲洗序列""" air_rinsing_actions = [] @@ -1835,15 +881,15 @@ def _generate_air_rinsing_sequence(G: nx.DiGraph, from_vessel: str, to_vessel: s for _ in range(repeats): # 空气冲洗源容器 air_rinsing_actions.extend( - generate_pump_protocol(G, air_vessel, from_vessel, rinsing_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, air_vessel, from_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) # 空气冲洗目标容器 air_rinsing_actions.extend( - generate_pump_protocol(G, air_vessel, to_vessel, rinsing_volume, flowrate, transfer_flowrate) + generate_pump_protocol(G, air_vessel, to_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) except Exception as e: logger.warning(f"空气冲洗失败: {str(e)}") - return air_rinsing_actions + return air_rinsing_actions \ No newline at end of file diff --git a/unilabos/compile/utils/vessel_parser.py b/unilabos/compile/utils/vessel_parser.py index eb9218aa..a7bf673b 100644 --- a/unilabos/compile/utils/vessel_parser.py +++ b/unilabos/compile/utils/vessel_parser.py @@ -125,6 +125,29 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: """ debug_print(f"🔍 正在查找溶剂 '{solvent}' 的容器... 🧪") + # 第四步:通过数据中的试剂信息匹配 + debug_print(" 🧪 步骤1: 数据试剂信息匹配...") + for node_id in G.nodes(): + debug_print(f"查找 id {node_id}, type={G.nodes[node_id].get('type')}, data={G.nodes[node_id].get('data', {})} 的容器...") + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + + # 检查 data 中的 reagent_name 字段 + reagent_name = vessel_data.get('reagent_name', '').lower() + if reagent_name and solvent.lower() == reagent_name: + debug_print(f" 🎉 通过data.reagent_name匹配找到容器: {node_id} (试剂: {reagent_name}) ✨") + return node_id + + # 检查 data 中的液体信息 + liquids = vessel_data.get('liquid', []) or vessel_data.get('liquids', []) + for liquid in liquids: + if isinstance(liquid, dict): + liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() + + if solvent.lower() == liquid_type or solvent.lower() in liquid_type: + debug_print(f" 🎉 通过液体类型匹配找到容器: {node_id} (液体类型: {liquid_type}) ✨") + return node_id + # 构建可能的容器名称 possible_names = [ f"flask_{solvent}", @@ -140,14 +163,14 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: debug_print(f"📋 候选容器名称: {possible_names[:3]}... (共{len(possible_names)}个) 📝") # 第一步:通过容器名称匹配 - debug_print(" 🎯 步骤1: 精确名称匹配...") + debug_print(" 🎯 步骤2: 精确名称匹配...") for vessel_name in possible_names: if vessel_name in G.nodes(): debug_print(f" 🎉 通过名称匹配找到容器: {vessel_name} ✨") return vessel_name # 第二步:通过模糊匹配(节点ID和名称) - debug_print(" 🔍 步骤2: 模糊名称匹配...") + debug_print(" 🔍 步骤3: 模糊名称匹配...") for node_id in G.nodes(): if G.nodes[node_id].get('type') == 'container': node_name = G.nodes[node_id].get('name', '').lower() @@ -157,7 +180,7 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: return node_id # 第三步:通过配置中的试剂信息匹配 - debug_print(" 🧪 步骤3: 配置试剂信息匹配...") + debug_print(" 🧪 步骤4: 配置试剂信息匹配...") for node_id in G.nodes(): if G.nodes[node_id].get('type') == 'container': # 检查 config 中的 reagent 字段 @@ -168,28 +191,6 @@ def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: debug_print(f" 🎉 通过config.reagent匹配找到容器: {node_id} (试剂: {config_reagent}) ✨") return node_id - # 第四步:通过数据中的试剂信息匹配 - debug_print(" 🧪 步骤4: 数据试剂信息匹配...") - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - vessel_data = G.nodes[node_id].get('data', {}) - - # 检查 data 中的 reagent_name 字段 - reagent_name = vessel_data.get('reagent_name', '').lower() - if reagent_name and solvent.lower() == reagent_name: - debug_print(f" 🎉 通过data.reagent_name匹配找到容器: {node_id} (试剂: {reagent_name}) ✨") - return node_id - - # 检查 data 中的液体信息 - liquids = vessel_data.get('liquid', []) - for liquid in liquids: - if isinstance(liquid, dict): - liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() - - if solvent.lower() in liquid_type: - debug_print(f" 🎉 通过液体类型匹配找到容器: {node_id} (液体类型: {liquid_type}) ✨") - return node_id - # 第五步:部分匹配(如果前面都没找到) debug_print(" 🔍 步骤5: 部分匹配...") for node_id in G.nodes(): diff --git a/unilabos/devices/virtual/virtual_multiway_valve.py b/unilabos/devices/virtual/virtual_multiway_valve.py index 1d9904e6..d0d792e7 100644 --- a/unilabos/devices/virtual/virtual_multiway_valve.py +++ b/unilabos/devices/virtual/virtual_multiway_valve.py @@ -21,19 +21,6 @@ class VirtualMultiwayValve: self._current_position = 0 # 默认在0号位(transfer pump位置) self._target_position = 0 - # 位置映射说明 - self.position_map = { - 0: "transfer_pump", # 0号位连接转移泵 - 1: "port_1", # 1号位 - 2: "port_2", # 2号位 - 3: "port_3", # 3号位 - 4: "port_4", # 4号位 - 5: "port_5", # 5号位 - 6: "port_6", # 6号位 - 7: "port_7", # 7号位 - 8: "port_8" # 8号位 - } - print(f"🔄 === 虚拟多通阀门已创建 === ✨") print(f"🎯 端口: {port} | 📊 位置范围: 0-{self.max_positions} | 🏠 初始位置: 0 (transfer_pump)") self.logger.info(f"🔧 多通阀门初始化: 端口={port}, 最大位置={self.max_positions}") @@ -60,7 +47,7 @@ class VirtualMultiwayValve: def get_current_port(self) -> str: """获取当前连接的端口名称 🔌""" - return self.position_map.get(self._current_position, "unknown") + return self._current_position def set_position(self, command: Union[int, str]): """ @@ -115,7 +102,7 @@ class VirtualMultiwayValve: old_position = self._current_position old_port = self.get_current_port() - self.logger.info(f"🔄 阀门切换: {old_position}({old_port}) → {pos}({self.position_map.get(pos, 'unknown')}) {pos_emoji}") + self.logger.info(f"🔄 阀门切换: {old_position}({old_port}) → {pos} {pos_emoji}") self._status = "Busy" self._valve_state = "Moving" @@ -190,6 +177,17 @@ class VirtualMultiwayValve: """获取阀门位置 - 兼容性方法 📍""" return self._current_position + def set_valve_position(self, command: Union[int, str]): + """ + 设置阀门位置 - 兼容pump_protocol调用 🎯 + 这是set_position的别名方法,用于兼容pump_protocol.py + + Args: + command: 目标位置 (0-8) 或位置字符串 + """ + # 删除debug日志:self.logger.debug(f"🎯 兼容性调用: set_valve_position({command})") + return self.set_position(command) + def is_at_position(self, position: int) -> bool: """检查是否在指定位置 🎯""" result = self._current_position == position @@ -210,17 +208,6 @@ class VirtualMultiwayValve: # 删除debug日志:self.logger.debug(f"🔌 端口{port_number}检查: {port_status} (当前位置: {self._current_position})") return result - def get_available_positions(self) -> list: - """获取可用位置列表 📋""" - positions = list(range(0, self.max_positions + 1)) - # 删除debug日志:self.logger.debug(f"📋 可用位置: {positions}") - return positions - - def get_available_ports(self) -> Dict[int, str]: - """获取可用端口映射 🗺️""" - # 删除debug日志:self.logger.debug(f"🗺️ 端口映射: {self.position_map}") - return self.position_map.copy() - def reset(self): """重置阀门到transfer pump位置(0号位)🔄""" self.logger.info(f"🔄 重置阀门到泵位置...") @@ -259,17 +246,6 @@ class VirtualMultiwayValve: return f"🔄 VirtualMultiwayValve({status_emoji} 位置: {self._current_position}/{self.max_positions}, 端口: {current_port}, 状态: {self._status})" - def set_valve_position(self, command: Union[int, str]): - """ - 设置阀门位置 - 兼容pump_protocol调用 🎯 - 这是set_position的别名方法,用于兼容pump_protocol.py - - Args: - command: 目标位置 (0-8) 或位置字符串 - """ - # 删除debug日志:self.logger.debug(f"🎯 兼容性调用: set_valve_position({command})") - return self.set_position(command) - # 使用示例 if __name__ == "__main__": @@ -291,10 +267,6 @@ if __name__ == "__main__": print(f"\n🔌 切换到2号位: {valve.set_to_port(2)}") print(f"📍 当前状态: {valve}") - # 显示所有可用位置 - print(f"\n📋 可用位置: {valve.get_available_positions()}") - print(f"🗺️ 端口映射: {valve.get_available_ports()}") - # 测试切换功能 print(f"\n🔄 智能切换测试:") print(f"当前位置: {valve._current_position}") diff --git a/unilabos/devices/virtual/virtual_rotavap.py b/unilabos/devices/virtual/virtual_rotavap.py index 61e66a30..23e24b7e 100644 --- a/unilabos/devices/virtual/virtual_rotavap.py +++ b/unilabos/devices/virtual/virtual_rotavap.py @@ -99,8 +99,8 @@ class VirtualRotavap: self.logger.error(f"❌ 时间参数类型无效: {type(time)},使用默认值180.0秒") time = 180.0 - # 确保time是float类型 - time = float(time) + # 确保time是float类型; 并加速 + time = float(time) / 10.0 # 🔧 简化处理:如果vessel就是设备自己,直接操作 if vessel == self.device_id: diff --git a/unilabos/devices/virtual/virtual_solenoid_valve.py b/unilabos/devices/virtual/virtual_solenoid_valve.py index 54a1e6d0..e0194248 100644 --- a/unilabos/devices/virtual/virtual_solenoid_valve.py +++ b/unilabos/devices/virtual/virtual_solenoid_valve.py @@ -48,20 +48,6 @@ class VirtualSolenoidValve: """获取阀门位置状态""" return "OPEN" if self._is_open else "CLOSED" - @property - def state(self) -> dict: - """获取阀门完整状态""" - return { - "device_id": self.device_id, - "port": self.port, - "voltage": self.voltage, - "response_time": self.response_time, - "is_open": self._is_open, - "valve_state": self._valve_state, - "status": self._status, - "position": self.valve_position - } - async def set_valve_position(self, command: str = None, **kwargs): """ 设置阀门位置 - ROS动作接口 diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index af2ccf3e..9531158e 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -2161,8 +2161,6 @@ virtual_multiway_valve: type: SendCmd module: unilabos.devices.virtual.virtual_multiway_valve:VirtualMultiwayValve status_types: - available_ports: dict - available_positions: list current_port: str current_position: int flow_path: str @@ -2268,10 +2266,6 @@ virtual_multiway_valve: type: object data: properties: - available_ports: - type: object - available_positions: - type: array current_port: type: string current_position: @@ -2293,8 +2287,6 @@ virtual_multiway_valve: - target_position - current_port - valve_position - - available_positions - - available_ports - flow_path type: object version: 1.0.0 @@ -3775,7 +3767,6 @@ virtual_solenoid_valve: module: unilabos.devices.virtual.virtual_solenoid_valve:VirtualSolenoidValve status_types: is_open: bool - state: dict status: str valve_position: str valve_state: str @@ -3813,8 +3804,6 @@ virtual_solenoid_valve: properties: is_open: type: boolean - state: - type: object status: type: string valve_position: @@ -3826,7 +3815,6 @@ virtual_solenoid_valve: - valve_state - is_open - valve_position - - state type: object version: 1.0.0 virtual_solid_dispenser: diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index e7b4da21..1ed4bd6e 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -395,6 +395,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): if "data" not in resource: resource["data"] = {} resource["data"].update(json.loads(container_instance.data)) + request.resources[0].name = resource["name"] logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict") else: logger.info(f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index eeb32ce1..07e89602 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -404,13 +404,14 @@ class HostNode(BaseROS2DeviceNode): class_name: str, parent: str, bind_locations: Point, - liquid_input_slot: list[int], - liquid_type: list[str], - liquid_volume: list[int], - slot_on_deck: str, + liquid_input_slot: list[int] = [], + liquid_type: list[str] = [], + liquid_volume: list[int] = [], + slot_on_deck: str = "", ): # 暂不支持多对同名父子同时存在 res_creation_input = { + "id": res_id.split("/")[-1], "name": res_id.split("/")[-1], "class": class_name, "parent": parent.split("/")[-1], @@ -424,8 +425,10 @@ class HostNode(BaseROS2DeviceNode): res_creation_input.update( { "data": { + "liquids": [{ "liquid_type": liquid_type[0] if liquid_type else None, "liquid_volume": liquid_volume[0] if liquid_volume else None, + }] } } )