From a6150367547a3124425956759edea5fb67579c7a Mon Sep 17 00:00:00 2001 From: ZiWei <131428629+ZiWei09@users.noreply.github.com> Date: Thu, 14 Aug 2025 10:02:33 +0800 Subject: [PATCH] pump_protocal.py Version IOAI --- unilabos/compile/pump_protocol.py | 1365 ++++++++++++++++++++++++----- 1 file changed, 1151 insertions(+), 214 deletions(-) diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index 22b9f10..364ef5d 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -1,5 +1,4 @@ import traceback -import traceback import numpy as np import networkx as nx import asyncio @@ -10,11 +9,8 @@ import sys from unilabos.compile.utils.vessel_parser import get_vessel -from unilabos.compile.utils.vessel_parser import get_vessel - logger = logging.getLogger(__name__) - def debug_print(message): """强制输出调试信息""" timestamp = time_module.strftime("%H:%M:%S") @@ -24,34 +20,33 @@ def debug_print(message): # 同时写入日志 logger.info(output) - def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: """ 从容器节点的数据中获取液体体积 """ debug_print(f"🔍 开始读取容器 '{vessel}' 的液体体积...") - + if vessel not in G.nodes(): logger.error(f"❌ 容器 '{vessel}' 不存在于系统图中") debug_print(f" - 系统中的容器: {list(G.nodes())}") return 0.0 - + vessel_data = G.nodes[vessel].get('data', {}) debug_print(f"📋 容器 '{vessel}' 的数据结构: {vessel_data}") - + total_volume = 0.0 - + # 方法1:检查 'liquid' 字段(列表格式) debug_print("🔍 方法1: 检查 'liquid' 字段...") if 'liquid' in vessel_data: liquids = vessel_data['liquid'] debug_print(f" - liquid 字段类型: {type(liquids)}") debug_print(f" - liquid 字段内容: {liquids}") - + if isinstance(liquids, list): debug_print(f" - liquid 是列表,包含 {len(liquids)} 个元素") for i, liquid in enumerate(liquids): - debug_print(f" 液体 {i + 1}: {liquid}") + debug_print(f" 液体 {i+1}: {liquid}") if isinstance(liquid, dict): volume_keys = ['liquid_volume', 'volume', 'amount', 'quantity'] for key in volume_keys: @@ -68,7 +63,7 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: debug_print(f" - liquid 不是列表: {type(liquids)}") else: debug_print(" - 没有 'liquid' 字段") - + # 方法2:检查直接的体积字段 debug_print("🔍 方法2: 检查直接体积字段...") volume_keys = ['total_volume', 'volume', 'liquid_volume', 'amount', 'current_volume'] @@ -82,7 +77,7 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: except (ValueError, TypeError) as e: logger.warning(f" ⚠️ 无法转换 '{key}': {vessel_data[key]} -> {str(e)}") continue - + # 方法3:检查 'state' 或 'status' 字段 debug_print("🔍 方法3: 检查 'state' 字段...") if 'state' in vessel_data and isinstance(vessel_data['state'], dict): @@ -97,11 +92,10 @@ def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: logger.warning(f" ⚠️ 无法转换 state.volume: {state['volume']} -> {str(e)}") else: debug_print(" - 没有 'state' 字段或不是字典") - + debug_print(f"📊 容器 '{vessel}' 最终检测体积: {total_volume}mL") return total_volume - def is_integrated_pump(node_name): return "pump" in node_name and "valve" in node_name @@ -112,43 +106,43 @@ def find_connected_pump(G, valve_node): 🔧 修复:区分电磁阀和多通阀,电磁阀不参与泵查找 """ debug_print(f"🔍 查找与阀门 {valve_node} 相连的泵...") - + # 🔧 关键修复:检查节点类型,电磁阀不应该查找泵 node_data = G.nodes.get(valve_node, {}) node_class = node_data.get("class", "") or "" - + debug_print(f" - 阀门类型: {node_class}") - + # 如果是电磁阀,不应该查找泵(电磁阀只是开关) if ("solenoid" in node_class.lower() or "solenoid_valve" in valve_node.lower()): debug_print(f" ⚠️ {valve_node} 是电磁阀,不应该查找泵节点") raise ValueError(f"电磁阀 {valve_node} 不应该参与泵查找逻辑") - + # 只有多通阀等复杂阀门才需要查找连接的泵 if ("multiway" in node_class.lower() or "valve" in node_class.lower()): debug_print(f" - {valve_node} 是多通阀,查找连接的泵...") + return valve_node # 方法1:直接相邻的泵 for neighbor in G.neighbors(valve_node): neighbor_class = G.nodes[neighbor].get("class", "") or "" # 排除非 电磁阀 和 泵 的邻居 - # 排除非 电磁阀 和 泵 的邻居 debug_print(f" - 检查邻居 {neighbor}, class: {neighbor_class}") if "pump" in neighbor_class.lower(): debug_print(f" ✅ 找到直接相连的泵: {neighbor}") return neighbor - + # 方法2:通过路径查找泵(最多2跳) debug_print(f" - 未找到直接相连的泵,尝试路径查找...") - + # 获取所有泵节点 pump_nodes = [] for node_id in G.nodes(): node_class = G.nodes[node_id].get("class", "") or "" if "pump" in node_class.lower(): pump_nodes.append(node_id) - + debug_print(f" - 系统中的泵节点: {pump_nodes}") - + # 查找到泵的最短路径 for pump_node in pump_nodes: try: @@ -156,13 +150,18 @@ def find_connected_pump(G, valve_node): path = nx.shortest_path(G, valve_node, pump_node) path_length = len(path) - 1 debug_print(f" - 到泵 {pump_node} 的路径: {path}, 距离: {path_length}") - + if path_length <= 2: # 最多允许2跳 debug_print(f" ✅ 通过路径找到泵: {pump_node}") return pump_node except nx.NetworkXNoPath: continue - + + # 方法3:降级方案 - 返回第一个可用的泵 + if pump_nodes: + debug_print(f" ⚠️ 未找到连接的泵,使用第一个可用的泵: {pump_nodes[0]}") + return pump_nodes[0] + # 最终失败 debug_print(f" ❌ 完全找不到泵节点") raise ValueError(f"未找到与阀 {valve_node} 相连的泵节点") @@ -175,26 +174,26 @@ def build_pump_valve_maps(G, pump_backbone): """ pumps_from_node = {} valve_from_node = {} - + debug_print(f"🔧 构建泵-阀门映射,原始骨架: {pump_backbone}") - + # 🔧 关键修复:过滤掉电磁阀 filtered_backbone = [] for node in pump_backbone: node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" - + # 跳过电磁阀 if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): debug_print(f" - 跳过电磁阀: {node}") continue - + filtered_backbone.append(node) - + debug_print(f"🔧 过滤后的骨架: {filtered_backbone}") - + for node in filtered_backbone: - if is_integrated_pump(G.nodes[node]["class"]): + if is_integrated_pump(node): pumps_from_node[node] = node valve_from_node[node] = node debug_print(f" - 集成泵-阀: {node}") @@ -207,18 +206,18 @@ def build_pump_valve_maps(G, pump_backbone): except ValueError as e: debug_print(f" - 跳过节点 {node}: {str(e)}") continue - + debug_print(f"🔧 最终映射: pumps={pumps_from_node}, valves={valve_from_node}") return pumps_from_node, valve_from_node def generate_pump_protocol( - G: nx.DiGraph, - from_vessel_id: str, - to_vessel_id: str, - volume: float, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, ) -> List[Dict[str, Any]]: """ 生成泵操作的动作序列 - 修复版本 @@ -226,40 +225,34 @@ def generate_pump_protocol( """ pump_action_sequence = [] nodes = G.nodes(data=True) - + # 验证输入参数 if volume <= 0: logger.error(f"无效的体积参数: {volume}mL") return pump_action_sequence - + if flowrate <= 0: flowrate = 2.5 logger.warning(f"flowrate <= 0,使用默认值 {flowrate}mL/s") - + if transfer_flowrate <= 0: transfer_flowrate = 0.5 logger.warning(f"transfer_flowrate <= 0,使用默认值 {transfer_flowrate}mL/s") - + # 验证容器存在 debug_print(f"🔍 验证源容器 '{from_vessel_id}' 和目标容器 '{to_vessel_id}' 是否存在...") - if from_vessel_id not in G.nodes(): - logger.error(f"源容器 '{from_vessel_id}' 不存在") - debug_print(f"🔍 验证源容器 '{from_vessel_id}' 和目标容器 '{to_vessel_id}' 是否存在...") if from_vessel_id not in G.nodes(): logger.error(f"源容器 '{from_vessel_id}' 不存在") return pump_action_sequence - + if to_vessel_id not in G.nodes(): logger.error(f"目标容器 '{to_vessel_id}' 不存在") return pump_action_sequence - + try: shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) debug_print(f"PUMP_TRANSFER: 路径 {from_vessel_id} -> {to_vessel_id}: {shortest_path}") - shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) - debug_print(f"PUMP_TRANSFER: 路径 {from_vessel_id} -> {to_vessel_id}: {shortest_path}") except nx.NetworkXNoPath: - logger.error(f"无法找到从 '{from_vessel_id}' 到 '{to_vessel_id}' 的路径") logger.error(f"无法找到从 '{from_vessel_id}' 到 '{to_vessel_id}' 的路径") return pump_action_sequence @@ -267,21 +260,20 @@ def generate_pump_protocol( pump_backbone = [] for node in shortest_path: # 跳过起始和结束容器 - if node == from_vessel_id or node == to_vessel_id: if node == from_vessel_id or node == to_vessel_id: continue - + # 跳过电磁阀(电磁阀不参与泵操作) node_data = G.nodes.get(node, {}) node_class = node_data.get("class", "") or "" if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): debug_print(f"PUMP_TRANSFER: 跳过电磁阀 {node}") continue - + # 只包含多通阀和泵 if ("multiway" in node_class.lower() or "valve" in node_class.lower() or "pump" in node_class.lower()): pump_backbone.append(node) - + debug_print(f"PUMP_TRANSFER: 过滤后的泵骨架: {pump_backbone}") if not pump_backbone: @@ -312,7 +304,7 @@ def generate_pump_protocol( max_volume = pump_config.get("max_volume") if max_volume is not None: min_transfer_volumes.append(max_volume) - + if min_transfer_volumes: min_transfer_volume = min(min_transfer_volumes) else: @@ -323,7 +315,7 @@ def generate_pump_protocol( min_transfer_volume = 25.0 # 默认值 repeats = int(np.ceil(volume / min_transfer_volume)) - + if repeats > 1 and (from_vessel_id.startswith("pump") or to_vessel_id.startswith("pump")): logger.error("Cannot transfer volume larger than min_transfer_volume between two pumps.") return pump_action_sequence @@ -340,7 +332,7 @@ def generate_pump_protocol( def create_progress_log_action(message: str) -> Dict[str, Any]: """创建一个特殊的等待动作,在执行时打印进度日志""" return { - "action_name": "wait", + "action_name": "wait", "action_kwargs": { "time": 0.1, # 很短的等待时间 "progress_message": message # 自定义字段,用于进度日志 @@ -350,12 +342,12 @@ def generate_pump_protocol( # 生成泵操作序列 for i in range(repeats): current_volume = min(volume_left, min_transfer_volume) - + # 🆕 在每次循环开始时添加进度日志 if repeats > 1: - start_message = f"🚀 准备开始第 {i + 1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel_id} → {to_vessel_id}) 🚰" + start_message = f"🚀 准备开始第 {i+1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel_id} → {to_vessel_id}) 🚰" pump_action_sequence.append(create_progress_log_action(start_message)) - + # 🔧 修复:安全地获取边数据 def get_safe_edge_data(node_a, node_b, key): try: @@ -368,13 +360,11 @@ def generate_pump_protocol( except Exception as e: debug_print(f"PUMP_TRANSFER: 获取边数据失败 {node_a}->{node_b}: {str(e)}") return "default" - + # 从源容器吸液 - if not from_vessel_id.startswith("pump") and pump_backbone: if not from_vessel_id.startswith("pump") and pump_backbone: first_pump_node = pump_backbone[0] if first_pump_node in valve_from_node and first_pump_node in pumps_from_node: - port_command = get_safe_edge_data(first_pump_node, from_vessel_id, first_pump_node) port_command = get_safe_edge_data(first_pump_node, from_vessel_id, first_pump_node) pump_action_sequence.extend([ { @@ -394,13 +384,13 @@ def generate_pump_protocol( } ]) pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) - + # 泵间转移 for nodeA, nodeB in zip(pump_backbone[:-1], pump_backbone[1:]): if nodeA in valve_from_node and nodeB in valve_from_node and nodeA in pumps_from_node and nodeB in pumps_from_node: port_a = get_safe_edge_data(nodeA, nodeB, nodeA) port_b = get_safe_edge_data(nodeB, nodeA, nodeB) - + pump_action_sequence.append([ { "device_id": valve_from_node[nodeA], @@ -438,11 +428,9 @@ def generate_pump_protocol( pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) # 排液到目标容器 - if not to_vessel_id.startswith("pump") and pump_backbone: if not to_vessel_id.startswith("pump") and pump_backbone: last_pump_node = pump_backbone[-1] if last_pump_node in valve_from_node and last_pump_node in pumps_from_node: - port_command = get_safe_edge_data(last_pump_node, to_vessel_id, last_pump_node) port_command = get_safe_edge_data(last_pump_node, to_vessel_id, last_pump_node) pump_action_sequence.extend([ { @@ -467,75 +455,70 @@ def generate_pump_protocol( if repeats > 1: remaining_volume = volume_left - current_volume if remaining_volume > 0: - end_message = f"✅ 第 {i + 1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" + end_message = f"✅ 第 {i+1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" else: - end_message = f"🎉 第 {i + 1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" - + end_message = f"🎉 第 {i+1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" + pump_action_sequence.append(create_progress_log_action(end_message)) volume_left -= current_volume - + return pump_action_sequence -# 保持原有的同步版本兼容性 def generate_pump_protocol_with_rinsing( - G: nx.DiGraph, - from_vessel: dict, - to_vessel: dict, - volume: float = 0.0, - amount: str = "", - time: float = 0.0, - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False, - flowrate: float = 2.5, - transfer_flowrate: float = 0.5, - rate_spec: str = "", - event: str = "", - through: str = "", - **kwargs + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, # 🔧 修复:统一使用 time + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs ) -> List[Dict[str, Any]]: """ 原有的同步版本,添加防冲突机制 """ - + # 添加执行锁,防止并发调用 import threading if not hasattr(generate_pump_protocol_with_rinsing, '_lock'): generate_pump_protocol_with_rinsing._lock = threading.Lock() - - from_vessel_id, _ = get_vessel(from_vessel) - to_vessel_id, _ = get_vessel(to_vessel) - + with generate_pump_protocol_with_rinsing._lock: debug_print("=" * 60) debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (同步版本)") debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") - debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") debug_print(f" 🕐 时间戳: {time_module.time()}") debug_print(f" 🔒 获得执行锁") debug_print("=" * 60) - + # 短暂延迟,避免快速重复调用 time_module.sleep(0.01) - + debug_print("🔍 步骤1: 开始体积处理...") - + # 1. 处理体积参数 final_volume = volume debug_print(f"📋 初始设置: final_volume = {final_volume}") - + # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 if volume == 0.0: debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") - + # 直接从源容器读取实际体积 actual_volume = get_vessel_liquid_volume(G, from_vessel_id) debug_print(f"📖 从容器 '{from_vessel_id}' 读取到体积: {actual_volume}mL") - + if actual_volume > 0: final_volume = actual_volume debug_print(f"✅ 成功设置体积为: {final_volume}mL") @@ -544,66 +527,65 @@ def generate_pump_protocol_with_rinsing( logger.warning(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") else: debug_print(f"📌 体积非零,直接使用: {final_volume}mL") - + # 处理 amount 参数 if amount and amount.strip(): debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") parsed_volume = _parse_amount_to_volume(amount) debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") - + if parsed_volume > 0: final_volume = parsed_volume debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") elif parsed_volume == 0.0 and amount.lower().strip() == "all": debug_print("🎯 检测到 amount='all',从容器读取全部体积...") actual_volume = get_vessel_liquid_volume(G, from_vessel_id) - actual_volume = get_vessel_liquid_volume(G, from_vessel_id) if actual_volume > 0: final_volume = actual_volume debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") - + # 最终体积验证 debug_print(f"🔍 步骤2: 最终体积验证...") if final_volume <= 0: logger.error(f"❌ 体积无效: {final_volume}mL") final_volume = 10.0 logger.warning(f"⚠️ 强制设置为默认值: {final_volume}mL") - + debug_print(f"✅ 最终确定体积: {final_volume}mL") - + # 2. 处理流速参数 debug_print(f"🔍 步骤3: 处理流速参数...") debug_print(f" - 原始 flowrate: {flowrate}") debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") - + final_flowrate = flowrate if flowrate > 0 else 2.5 final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 - + if flowrate <= 0: logger.warning(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") if transfer_flowrate <= 0: logger.warning(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") - + debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") - + # 3. 根据时间计算流速 if time > 0 and final_volume > 0: debug_print(f"🔍 步骤4: 根据时间计算流速...") calculated_flowrate = final_volume / time debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") - + if flowrate <= 0 or flowrate == 2.5: final_flowrate = min(calculated_flowrate, 10.0) debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") if transfer_flowrate <= 0 or transfer_flowrate == 0.5: final_transfer_flowrate = min(calculated_flowrate, 5.0) debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") - + # 4. 根据速度规格调整 if rate_spec: debug_print(f"🔍 步骤5: 根据速度规格调整...") debug_print(f" - 速度规格: '{rate_spec}'") - + if rate_spec == "dropwise": final_flowrate = min(final_flowrate, 0.1) final_transfer_flowrate = min(final_transfer_flowrate, 0.1) @@ -616,86 +598,490 @@ def generate_pump_protocol_with_rinsing( final_flowrate = max(final_flowrate, 5.0) final_transfer_flowrate = max(final_transfer_flowrate, 2.0) debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") + + try: + # 🆕 修复:在这里调用带有循环日志的generate_pump_protocol_with_loop_logging函数 + pump_action_sequence = generate_pump_protocol_with_loop_logging( + G, from_vessel_id, to_vessel_id, final_volume, + final_flowrate, final_transfer_flowrate + ) + + debug_print(f"🔓 释放执行锁") + return pump_action_sequence + + except Exception as e: + logger.error(f"❌ 协议生成失败: {str(e)}") + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"❌ 协议生成失败: {str(e)}" + } + } + ] - # 5. 处理冲洗参数 - debug_print(f"🔍 步骤6: 处理冲洗参数...") - final_rinsing_solvent = rinsing_solvent - final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 - final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 - if rinsing_volume <= 0: - logger.warning(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") - if rinsing_repeats <= 0: - logger.warning(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") +def generate_pump_protocol_with_loop_logging( + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, +) -> List[Dict[str, Any]]: + """ + 生成泵操作的动作序列 - 带循环日志版本 + 🔧 修复:正确处理包含电磁阀的路径,并在合适时机打印循环日志 + """ + pump_action_sequence = [] + nodes = G.nodes(data=True) + + # 验证输入参数 + if volume <= 0: + logger.error(f"无效的体积参数: {volume}mL") + return pump_action_sequence + + if flowrate <= 0: + flowrate = 2.5 + logger.warning(f"flowrate <= 0,使用默认值 {flowrate}mL/s") + + if transfer_flowrate <= 0: + transfer_flowrate = 0.5 + logger.warning(f"transfer_flowrate <= 0,使用默认值 {transfer_flowrate}mL/s") + + # 验证容器存在 + if from_vessel_id not in G.nodes(): + logger.error(f"源容器 '{from_vessel_id}' 不存在") + return pump_action_sequence + + if to_vessel_id not in G.nodes(): + logger.error(f"目标容器 '{to_vessel_id}' 不存在") + return pump_action_sequence + + try: + shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) + debug_print(f"PUMP_TRANSFER: 路径 {from_vessel_id} -> {to_vessel_id}: {shortest_path}") + except nx.NetworkXNoPath: + logger.error(f"无法找到从 '{from_vessel_id}' 到 '{to_vessel_id}' 的路径") + return pump_action_sequence - # 根据物理属性调整冲洗参数 - if viscous or solid: - final_rinsing_repeats = max(final_rinsing_repeats, 3) - final_rinsing_volume = max(final_rinsing_volume, 10.0) - debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") + # 🔧 关键修复:正确构建泵骨架,排除容器和电磁阀 + pump_backbone = [] + for node in shortest_path: + # 跳过起始和结束容器 + if node == from_vessel_id or node == to_vessel_id: + continue + + # 跳过电磁阀(电磁阀不参与泵操作) + node_data = G.nodes.get(node, {}) + node_class = node_data.get("class", "") or "" + if ("solenoid" in node_class.lower() or "solenoid_valve" in node.lower()): + debug_print(f"PUMP_TRANSFER: 跳过电磁阀 {node}") + continue + + # 只包含多通阀和泵 + if ("multiway" in node_class.lower() or "valve" in node_class.lower() or "pump" in node_class.lower()): + pump_backbone.append(node) + + debug_print(f"PUMP_TRANSFER: 过滤后的泵骨架: {pump_backbone}") + if not pump_backbone: + debug_print("PUMP_TRANSFER: 没有泵骨架节点,可能是直接容器连接或只有电磁阀") + return pump_action_sequence + + if transfer_flowrate == 0: + transfer_flowrate = flowrate + + try: + pumps_from_node, valve_from_node = build_pump_valve_maps(G, pump_backbone) + except Exception as e: + debug_print(f"PUMP_TRANSFER: 构建泵-阀门映射失败: {str(e)}") + return pump_action_sequence + + if not pumps_from_node: + debug_print("PUMP_TRANSFER: 没有可用的泵映射") + return pump_action_sequence + + # 🔧 修复:安全地获取最小转移体积 + try: + min_transfer_volumes = [] + for node in pump_backbone: + if node in pumps_from_node: + pump_node = pumps_from_node[node] + if pump_node in nodes: + pump_config = nodes[pump_node].get("config", {}) + max_volume = pump_config.get("max_volume") + if max_volume is not None: + min_transfer_volumes.append(max_volume) + + if min_transfer_volumes: + min_transfer_volume = min(min_transfer_volumes) + else: + min_transfer_volume = 25.0 # 默认值 + debug_print(f"PUMP_TRANSFER: 无法获取泵的最大体积,使用默认值: {min_transfer_volume}mL") + except Exception as e: + debug_print(f"PUMP_TRANSFER: 获取最小转移体积失败: {str(e)}") + min_transfer_volume = 25.0 # 默认值 + + repeats = int(np.ceil(volume / min_transfer_volume)) + + if repeats > 1 and (from_vessel_id.startswith("pump") or to_vessel_id.startswith("pump")): + logger.error("Cannot transfer volume larger than min_transfer_volume between two pumps.") + return pump_action_sequence + + volume_left = volume + debug_print(f"PUMP_TRANSFER: 需要 {repeats} 次转移,单次最大体积 {min_transfer_volume} mL") + + # 🆕 只在开头打印总体概览 + if repeats > 1: + debug_print(f"🔄 分批转移概览: 总体积 {volume:.2f}mL,需要 {repeats} 次转移") + logger.info(f"🔄 分批转移概览: 总体积 {volume:.2f}mL,需要 {repeats} 次转移") + + # 🔧 创建一个自定义的wait动作,用于在执行时打印日志 + def create_progress_log_action(message: str) -> Dict[str, Any]: + """创建一个特殊的等待动作,在执行时打印进度日志""" + return { + "action_name": "wait", + "action_kwargs": { + "time": 0.1, # 很短的等待时间 + "progress_message": message # 自定义字段,用于进度日志 + } + } + + # 生成泵操作序列 + for i in range(repeats): + current_volume = min(volume_left, min_transfer_volume) + + # 🆕 在每次循环开始时添加进度日志 + if repeats > 1: + start_message = f"🚀 准备开始第 {i+1}/{repeats} 次转移: {current_volume:.2f}mL ({from_vessel_id} → {to_vessel_id}) 🚰" + pump_action_sequence.append(create_progress_log_action(start_message)) + + # 🔧 修复:安全地获取边数据 + def get_safe_edge_data(node_a, node_b, key): + try: + edge_data = G.get_edge_data(node_a, node_b) + if edge_data and "port" in edge_data: + port_data = edge_data["port"] + if isinstance(port_data, dict) and key in port_data: + return port_data[key] + return "default" + except Exception as e: + debug_print(f"PUMP_TRANSFER: 获取边数据失败 {node_a}->{node_b}: {str(e)}") + return "default" + + # 从源容器吸液 + if not from_vessel_id.startswith("pump") and pump_backbone: + first_pump_node = pump_backbone[0] + if first_pump_node in valve_from_node and first_pump_node in pumps_from_node: + port_command = get_safe_edge_data(first_pump_node, from_vessel_id, first_pump_node) + pump_action_sequence.extend([ + { + "device_id": valve_from_node[first_pump_node], + "action_name": "set_valve_position", + "action_kwargs": { + "command": port_command + } + }, + { + "device_id": pumps_from_node[first_pump_node], + "action_name": "set_position", + "action_kwargs": { + "position": float(current_volume), + "max_velocity": transfer_flowrate + } + } + ]) + pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) + + # 泵间转移 + for nodeA, nodeB in zip(pump_backbone[:-1], pump_backbone[1:]): + if nodeA in valve_from_node and nodeB in valve_from_node and nodeA in pumps_from_node and nodeB in pumps_from_node: + port_a = get_safe_edge_data(nodeA, nodeB, nodeA) + port_b = get_safe_edge_data(nodeB, nodeA, nodeB) + + pump_action_sequence.append([ + { + "device_id": valve_from_node[nodeA], + "action_name": "set_valve_position", + "action_kwargs": { + "command": port_a + } + }, + { + "device_id": valve_from_node[nodeB], + "action_name": "set_valve_position", + "action_kwargs": { + "command": port_b + } + } + ]) + pump_action_sequence.append([ + { + "device_id": pumps_from_node[nodeA], + "action_name": "set_position", + "action_kwargs": { + "position": 0.0, + "max_velocity": transfer_flowrate + } + }, + { + "device_id": pumps_from_node[nodeB], + "action_name": "set_position", + "action_kwargs": { + "position": float(current_volume), + "max_velocity": transfer_flowrate + } + } + ]) + pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) + + # 排液到目标容器 + if not to_vessel_id.startswith("pump") and pump_backbone: + last_pump_node = pump_backbone[-1] + if last_pump_node in valve_from_node and last_pump_node in pumps_from_node: + port_command = get_safe_edge_data(last_pump_node, to_vessel_id, last_pump_node) + pump_action_sequence.extend([ + { + "device_id": valve_from_node[last_pump_node], + "action_name": "set_valve_position", + "action_kwargs": { + "command": port_command + } + }, + { + "device_id": pumps_from_node[last_pump_node], + "action_name": "set_position", + "action_kwargs": { + "position": 0.0, + "max_velocity": flowrate + } + } + ]) + pump_action_sequence.append({"action_name": "wait", "action_kwargs": {"time": 3}}) + + # 🆕 在每次循环结束时添加完成日志 + if repeats > 1: + remaining_volume = volume_left - current_volume + if remaining_volume > 0: + end_message = f"✅ 第 {i+1}/{repeats} 次转移完成! 剩余 {remaining_volume:.2f}mL 待转移 ⏳" + else: + end_message = f"🎉 第 {i+1}/{repeats} 次转移完成! 全部 {volume:.2f}mL 转移完毕 ✨" + + pump_action_sequence.append(create_progress_log_action(end_message)) + + volume_left -= current_volume + + return pump_action_sequence + + +def generate_pump_protocol_with_rinsing( + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, # 🔧 修复:统一使用 time + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs +) -> List[Dict[str, Any]]: + """ + 增强兼容性的泵转移协议生成器,支持自动体积检测 + """ + debug_print("=" * 60) + debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议") + debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") + debug_print(f" 🕐 时间戳: {time_module.time()}") + debug_print(f" 📊 原始参数:") + debug_print(f" - volume: {volume} (类型: {type(volume)})") + debug_print(f" - amount: '{amount}'") + debug_print(f" - time: {time}") # 🔧 修复:统一使用 time + debug_print(f" - flowrate: {flowrate}") + debug_print(f" - transfer_flowrate: {transfer_flowrate}") + debug_print(f" - rate_spec: '{rate_spec}'") + debug_print("=" * 60) + + # ========== 🔧 核心修复:智能体积处理 ========== + + debug_print("🔍 步骤1: 开始体积处理...") + + # 1. 处理体积参数 + final_volume = volume + debug_print(f"📋 初始设置: final_volume = {final_volume}") + + # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 + if volume == 0.0: + debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") + + # 直接从源容器读取实际体积 + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + debug_print(f"📖 从容器 '{from_vessel_id}' 读取到体积: {actual_volume}mL") + + if actual_volume > 0: + final_volume = actual_volume + debug_print(f"✅ 成功设置体积为: {final_volume}mL") + else: + final_volume = 10.0 # 如果读取失败,使用默认值 + debug_print(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") + else: + debug_print(f"📌 体积非零,直接使用: {final_volume}mL") + + # 处理 amount 参数 + if amount and amount.strip(): + debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") + parsed_volume = _parse_amount_to_volume(amount) + debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") + + if parsed_volume > 0: + final_volume = parsed_volume + debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") + elif parsed_volume == 0.0 and amount.lower().strip() == "all": + debug_print("🎯 检测到 amount='all',从容器读取全部体积...") + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + if actual_volume > 0: + final_volume = actual_volume + debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") + + # 最终体积验证 + debug_print(f"🔍 步骤2: 最终体积验证...") + if final_volume <= 0: + debug_print(f"❌ 体积无效: {final_volume}mL") + final_volume = 10.0 + debug_print(f"⚠️ 强制设置为默认值: {final_volume}mL") + + debug_print(f"✅ 最终确定体积: {final_volume}mL") + + # 2. 处理流速参数 + debug_print(f"🔍 步骤3: 处理流速参数...") + debug_print(f" - 原始 flowrate: {flowrate}") + debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") + + final_flowrate = flowrate if flowrate > 0 else 2.5 + final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 + + if flowrate <= 0: + debug_print(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") + if transfer_flowrate <= 0: + debug_print(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") + + debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") + + # 3. 根据时间计算流速 + if time > 0 and final_volume > 0: # 🔧 修复:统一使用 time + debug_print(f"🔍 步骤4: 根据时间计算流速...") + calculated_flowrate = final_volume / time + debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") + + if flowrate <= 0 or flowrate == 2.5: + final_flowrate = min(calculated_flowrate, 10.0) + debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") + if transfer_flowrate <= 0 or transfer_flowrate == 0.5: + final_transfer_flowrate = min(calculated_flowrate, 5.0) + debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") + + # 4. 根据速度规格调整 + if rate_spec: + debug_print(f"🔍 步骤5: 根据速度规格调整...") + debug_print(f" - 速度规格: '{rate_spec}'") + + if rate_spec == "dropwise": + final_flowrate = min(final_flowrate, 0.1) + final_transfer_flowrate = min(final_transfer_flowrate, 0.1) + debug_print(f" - dropwise模式,流速调整为: {final_flowrate}mL/s") + elif rate_spec == "slowly": + final_flowrate = min(final_flowrate, 0.5) + final_transfer_flowrate = min(final_transfer_flowrate, 0.3) + debug_print(f" - slowly模式,流速调整为: {final_flowrate}mL/s") + elif rate_spec == "quickly": + final_flowrate = max(final_flowrate, 5.0) + final_transfer_flowrate = max(final_transfer_flowrate, 2.0) + debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") + + # # 5. 处理冲洗参数 + # debug_print(f"🔍 步骤6: 处理冲洗参数...") + # final_rinsing_solvent = rinsing_solvent + # final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 + # final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 + + # if rinsing_volume <= 0: + # debug_print(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") + # if rinsing_repeats <= 0: + # debug_print(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") + + # # 根据物理属性调整冲洗参数 + # if viscous or solid: + # final_rinsing_repeats = max(final_rinsing_repeats, 3) + # final_rinsing_volume = max(final_rinsing_volume, 10.0) + # debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") + # 参数总结 debug_print("📊 最终参数总结:") debug_print(f" - 体积: {final_volume}mL") debug_print(f" - 流速: {final_flowrate}mL/s") debug_print(f" - 转移流速: {final_transfer_flowrate}mL/s") - debug_print(f" - 冲洗溶剂: '{final_rinsing_solvent}'") - debug_print(f" - 冲洗体积: {final_rinsing_volume}mL") - debug_print(f" - 冲洗次数: {final_rinsing_repeats}次") - + # debug_print(f" - 冲洗溶剂: '{final_rinsing_solvent}'") + # debug_print(f" - 冲洗体积: {final_rinsing_volume}mL") + # debug_print(f" - 冲洗次数: {final_rinsing_repeats}次") + # ========== 执行基础转移 ========== - + debug_print("🔧 步骤7: 开始执行基础转移...") - + try: debug_print(f" - 调用 generate_pump_protocol...") - debug_print( - f" - 参数: G, '{from_vessel_id}', '{to_vessel_id}', {final_volume}, {final_flowrate}, {final_transfer_flowrate}") - + debug_print(f" - 参数: G, '{from_vessel_id}', '{to_vessel_id}', {final_volume}, {final_flowrate}, {final_transfer_flowrate}") + pump_action_sequence = generate_pump_protocol( - G, from_vessel_id, to_vessel_id, final_volume, G, from_vessel_id, to_vessel_id, final_volume, final_flowrate, final_transfer_flowrate ) - + debug_print(f" - generate_pump_protocol 返回结果:") debug_print(f" - 动作序列长度: {len(pump_action_sequence)}") debug_print(f" - 动作序列是否为空: {len(pump_action_sequence) == 0}") - + if not pump_action_sequence: debug_print("❌ 基础转移协议生成为空,可能是路径问题") debug_print(f" - 源容器存在: {from_vessel_id in G.nodes()}") debug_print(f" - 目标容器存在: {to_vessel_id in G.nodes()}") - + if from_vessel_id in G.nodes() and to_vessel_id in G.nodes(): try: - path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) debug_print(f" - 路径存在: {path}") except Exception as path_error: debug_print(f" - 无法找到路径: {str(path_error)}") - + return [ { "device_id": "system", "action_name": "log_message", "action_kwargs": { "message": f"⚠️ 路径问题,无法转移: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}" - "message": f"⚠️ 路径问题,无法转移: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}" } } ] - + debug_print(f"✅ 基础转移生成了 {len(pump_action_sequence)} 个动作") - + # 打印前几个动作用于调试 if len(pump_action_sequence) > 0: debug_print("🔍 前几个动作预览:") for i, action in enumerate(pump_action_sequence[:3]): - debug_print(f" 动作 {i + 1}: {action}") + debug_print(f" 动作 {i+1}: {action}") if len(pump_action_sequence) > 3: debug_print(f" ... 还有 {len(pump_action_sequence) - 3} 个动作") - + except Exception as e: debug_print(f"❌ 基础转移失败: {str(e)}") import traceback @@ -706,52 +1092,51 @@ def generate_pump_protocol_with_rinsing( "action_name": "log_message", "action_kwargs": { "message": f"❌ 转移失败: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}, 错误: {str(e)}" - "message": f"❌ 转移失败: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}, 错误: {str(e)}" } } ] - + # ========== 执行冲洗操作 ========== - - debug_print("🔧 步骤8: 检查冲洗操作...") - - if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: - debug_print(f"🧽 开始冲洗操作,溶剂: '{final_rinsing_solvent}'") - - try: - if final_rinsing_solvent.strip() != "air": - debug_print(" - 执行液体冲洗...") - rinsing_actions = _generate_rinsing_sequence( - G, from_vessel_id, to_vessel_id, final_rinsing_solvent, - final_rinsing_volume, final_rinsing_repeats, - final_flowrate, final_transfer_flowrate - ) - pump_action_sequence.extend(rinsing_actions) - debug_print(f" - 添加了 {len(rinsing_actions)} 个冲洗动作") - else: - debug_print(" - 执行空气冲洗...") - air_rinsing_actions = _generate_air_rinsing_sequence( - G, from_vessel_id, to_vessel_id, final_rinsing_volume, final_rinsing_repeats, - final_flowrate, final_transfer_flowrate - ) - pump_action_sequence.extend(air_rinsing_actions) - debug_print(f" - 添加了 {len(air_rinsing_actions)} 个空气冲洗动作") - except Exception as e: - debug_print(f"⚠️ 冲洗操作失败: {str(e)},跳过冲洗") - else: - debug_print(f"⏭️ 跳过冲洗操作") - debug_print(f" - 溶剂: '{final_rinsing_solvent}'") - debug_print(f" - 次数: {final_rinsing_repeats}") - debug_print(f" - 条件满足: {bool(final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0)}") - + + # debug_print("🔧 步骤8: 检查冲洗操作...") + + # if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: + # debug_print(f"🧽 开始冲洗操作,溶剂: '{final_rinsing_solvent}'") + + # try: + # if final_rinsing_solvent.strip() != "air": + # debug_print(" - 执行液体冲洗...") + # rinsing_actions = _generate_rinsing_sequence( + # G, from_vessel_id, to_vessel_id, final_rinsing_solvent, + # final_rinsing_volume, final_rinsing_repeats, + # final_flowrate, final_transfer_flowrate + # ) + # pump_action_sequence.extend(rinsing_actions) + # debug_print(f" - 添加了 {len(rinsing_actions)} 个冲洗动作") + # else: + # debug_print(" - 执行空气冲洗...") + # air_rinsing_actions = _generate_air_rinsing_sequence( + # G, from_vessel_id, to_vessel_id, final_rinsing_volume, final_rinsing_repeats, + # final_flowrate, final_transfer_flowrate + # ) + # pump_action_sequence.extend(air_rinsing_actions) + # debug_print(f" - 添加了 {len(air_rinsing_actions)} 个空气冲洗动作") + # except Exception as e: + # debug_print(f"⚠️ 冲洗操作失败: {str(e)},跳过冲洗") + # else: + # debug_print(f"⏭️ 跳过冲洗操作") + # debug_print(f" - 溶剂: '{final_rinsing_solvent}'") + # debug_print(f" - 次数: {final_rinsing_repeats}") + # debug_print(f" - 条件满足: {bool(final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0)}") + # ========== 最终结果 ========== - + debug_print("=" * 60) debug_print(f"🎉 PUMP_TRANSFER: 协议生成完成") debug_print(f" 📊 总动作数: {len(pump_action_sequence)}") debug_print(f" 📋 最终体积: {final_volume}mL") debug_print(f" 🚀 执行路径: {from_vessel_id} -> {to_vessel_id}") - + # 最终验证 if len(pump_action_sequence) == 0: debug_print("🚨 协议生成结果为空!这是异常情况") @@ -764,15 +1149,581 @@ def generate_pump_protocol_with_rinsing( } } ] - + debug_print("=" * 60) return pump_action_sequence +async def generate_pump_protocol_with_rinsing_async( + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs +) -> List[Dict[str, Any]]: + """ + 异步版本的泵转移协议生成器,避免并发问题 + """ + debug_print("=" * 60) + debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (异步版本)") + debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") + debug_print(f" 🕐 时间戳: {time_module.time()}") + debug_print("=" * 60) + + # 添加唯一标识符 + protocol_id = f"pump_transfer_{int(time_module.time() * 1000000)}" + debug_print(f"📋 协议ID: {protocol_id}") + + # 调用原有的同步版本 + result = generate_pump_protocol_with_rinsing( + G, from_vessel_id, to_vessel_id, volume, amount, time, viscous, + rinsing_solvent, rinsing_volume, rinsing_repeats, solid, + flowrate, transfer_flowrate, rate_spec, event, through, **kwargs + ) + + # 为每个动作添加唯一标识 + for i, action in enumerate(result): + if isinstance(action, dict): + action['_protocol_id'] = protocol_id + action['_action_sequence'] = i + action['_timestamp'] = time_module.time() + + debug_print(f"📊 协议 {protocol_id} 生成完成,共 {len(result)} 个动作") + return result + +# 保持原有的同步版本兼容性 +def generate_pump_protocol_with_rinsing( + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs +) -> List[Dict[str, Any]]: + """ + 原有的同步版本,添加防冲突机制 + """ + + # 添加执行锁,防止并发调用 + import threading + if not hasattr(generate_pump_protocol_with_rinsing, '_lock'): + generate_pump_protocol_with_rinsing._lock = threading.Lock() + + with generate_pump_protocol_with_rinsing._lock: + debug_print("=" * 60) + debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (同步版本)") + debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") + debug_print(f" 🕐 时间戳: {time_module.time()}") + debug_print(f" 🔒 获得执行锁") + debug_print("=" * 60) + + # 短暂延迟,避免快速重复调用 + time_module.sleep(0.01) + + debug_print("🔍 步骤1: 开始体积处理...") + + # 1. 处理体积参数 + final_volume = volume + debug_print(f"📋 初始设置: final_volume = {final_volume}") + + # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 + if volume == 0.0: + debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") + + # 直接从源容器读取实际体积 + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + debug_print(f"📖 从容器 '{from_vessel_id}' 读取到体积: {actual_volume}mL") + + if actual_volume > 0: + final_volume = actual_volume + debug_print(f"✅ 成功设置体积为: {final_volume}mL") + else: + final_volume = 10.0 # 如果读取失败,使用默认值 + logger.warning(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") + else: + debug_print(f"📌 体积非零,直接使用: {final_volume}mL") + + # 处理 amount 参数 + if amount and amount.strip(): + debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") + parsed_volume = _parse_amount_to_volume(amount) + debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") + + if parsed_volume > 0: + final_volume = parsed_volume + debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") + elif parsed_volume == 0.0 and amount.lower().strip() == "all": + debug_print("🎯 检测到 amount='all',从容器读取全部体积...") + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + if actual_volume > 0: + final_volume = actual_volume + debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") + + # 最终体积验证 + debug_print(f"🔍 步骤2: 最终体积验证...") + if final_volume <= 0: + logger.error(f"❌ 体积无效: {final_volume}mL") + final_volume = 10.0 + logger.warning(f"⚠️ 强制设置为默认值: {final_volume}mL") + + debug_print(f"✅ 最终确定体积: {final_volume}mL") + + # 2. 处理流速参数 + debug_print(f"🔍 步骤3: 处理流速参数...") + debug_print(f" - 原始 flowrate: {flowrate}") + debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") + + final_flowrate = flowrate if flowrate > 0 else 2.5 + final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 + + if flowrate <= 0: + logger.warning(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") + if transfer_flowrate <= 0: + logger.warning(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") + + debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") + + # 3. 根据时间计算流速 + if time > 0 and final_volume > 0: + debug_print(f"🔍 步骤4: 根据时间计算流速...") + calculated_flowrate = final_volume / time + debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") + + if flowrate <= 0 or flowrate == 2.5: + final_flowrate = min(calculated_flowrate, 10.0) + debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") + if transfer_flowrate <= 0 or transfer_flowrate == 0.5: + final_transfer_flowrate = min(calculated_flowrate, 5.0) + debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") + + # 4. 根据速度规格调整 + if rate_spec: + debug_print(f"🔍 步骤5: 根据速度规格调整...") + debug_print(f" - 速度规格: '{rate_spec}'") + + if rate_spec == "dropwise": + final_flowrate = min(final_flowrate, 0.1) + final_transfer_flowrate = min(final_transfer_flowrate, 0.1) + debug_print(f" - dropwise模式,流速调整为: {final_flowrate}mL/s") + elif rate_spec == "slowly": + final_flowrate = min(final_flowrate, 0.5) + final_transfer_flowrate = min(final_transfer_flowrate, 0.3) + debug_print(f" - slowly模式,流速调整为: {final_flowrate}mL/s") + elif rate_spec == "quickly": + final_flowrate = max(final_flowrate, 5.0) + final_transfer_flowrate = max(final_transfer_flowrate, 2.0) + debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") + + # # 5. 处理冲洗参数 + # debug_print(f"🔍 步骤6: 处理冲洗参数...") + # final_rinsing_solvent = rinsing_solvent + # final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 + # final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 + + # if rinsing_volume <= 0: + # logger.warning(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") + # if rinsing_repeats <= 0: + # logger.warning(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") + + # # 根据物理属性调整冲洗参数 + # if viscous or solid: + # final_rinsing_repeats = max(final_rinsing_repeats, 3) + # final_rinsing_volume = max(final_rinsing_volume, 10.0) + # debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") + + # 参数总结 + debug_print("📊 最终参数总结:") + debug_print(f" - 体积: {final_volume}mL") + debug_print(f" - 流速: {final_flowrate}mL/s") + debug_print(f" - 转移流速: {final_transfer_flowrate}mL/s") + # debug_print(f" - 冲洗溶剂: '{final_rinsing_solvent}'") + # debug_print(f" - 冲洗体积: {final_rinsing_volume}mL") + # debug_print(f" - 冲洗次数: {final_rinsing_repeats}次") + + # ========== 执行基础转移 ========== + + debug_print("🔧 步骤7: 开始执行基础转移...") + + try: + debug_print(f" - 调用 generate_pump_protocol...") + debug_print(f" - 参数: G, '{from_vessel_id}', '{to_vessel_id}', {final_volume}, {final_flowrate}, {final_transfer_flowrate}") + + pump_action_sequence = generate_pump_protocol( + G, from_vessel_id, to_vessel_id, final_volume, + final_flowrate, final_transfer_flowrate + ) + + debug_print(f" - generate_pump_protocol 返回结果:") + debug_print(f" - 动作序列长度: {len(pump_action_sequence)}") + debug_print(f" - 动作序列是否为空: {len(pump_action_sequence) == 0}") + + if not pump_action_sequence: + debug_print("❌ 基础转移协议生成为空,可能是路径问题") + debug_print(f" - 源容器存在: {from_vessel_id in G.nodes()}") + debug_print(f" - 目标容器存在: {to_vessel_id in G.nodes()}") + + if from_vessel_id in G.nodes() and to_vessel_id in G.nodes(): + try: + path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) + debug_print(f" - 路径存在: {path}") + except Exception as path_error: + debug_print(f" - 无法找到路径: {str(path_error)}") + + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"⚠️ 路径问题,无法转移: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}" + } + } + ] + + debug_print(f"✅ 基础转移生成了 {len(pump_action_sequence)} 个动作") + + # 打印前几个动作用于调试 + if len(pump_action_sequence) > 0: + debug_print("🔍 前几个动作预览:") + for i, action in enumerate(pump_action_sequence[:3]): + debug_print(f" 动作 {i+1}: {action}") + if len(pump_action_sequence) > 3: + debug_print(f" ... 还有 {len(pump_action_sequence) - 3} 个动作") + + except Exception as e: + debug_print(f"❌ 基础转移失败: {str(e)}") + import traceback + debug_print(f"详细错误: {traceback.format_exc()}") + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"❌ 转移失败: {final_volume}mL 从 {from_vessel_id} 到 {to_vessel_id}, 错误: {str(e)}" + } + } + ] + + # ========== 执行冲洗操作 ========== + + # debug_print("🔧 步骤8: 检查冲洗操作...") + + # if final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0: + # debug_print(f"🧽 开始冲洗操作,溶剂: '{final_rinsing_solvent}'") + + # try: + # if final_rinsing_solvent.strip() != "air": + # debug_print(" - 执行液体冲洗...") + # rinsing_actions = _generate_rinsing_sequence( + # G, from_vessel_id, to_vessel_id, final_rinsing_solvent, + # final_rinsing_volume, final_rinsing_repeats, + # final_flowrate, final_transfer_flowrate + # ) + # pump_action_sequence.extend(rinsing_actions) + # debug_print(f" - 添加了 {len(rinsing_actions)} 个冲洗动作") + # else: + # debug_print(" - 执行空气冲洗...") + # air_rinsing_actions = _generate_air_rinsing_sequence( + # G, from_vessel_id, to_vessel_id, final_rinsing_volume, final_rinsing_repeats, + # final_flowrate, final_transfer_flowrate + # ) + # pump_action_sequence.extend(air_rinsing_actions) + # debug_print(f" - 添加了 {len(air_rinsing_actions)} 个空气冲洗动作") + # except Exception as e: + # debug_print(f"⚠️ 冲洗操作失败: {str(e)},跳过冲洗") + # else: + # debug_print(f"⏭️ 跳过冲洗操作") + # debug_print(f" - 溶剂: '{final_rinsing_solvent}'") + # debug_print(f" - 次数: {final_rinsing_repeats}") + # debug_print(f" - 条件满足: {bool(final_rinsing_solvent and final_rinsing_solvent.strip() and final_rinsing_repeats > 0)}") + + # ========== 最终结果 ========== + + debug_print("=" * 60) + debug_print(f"🎉 PUMP_TRANSFER: 协议生成完成") + debug_print(f" 📊 总动作数: {len(pump_action_sequence)}") + debug_print(f" 📋 最终体积: {final_volume}mL") + debug_print(f" 🚀 执行路径: {from_vessel_id} -> {to_vessel_id}") + + # 最终验证 + if len(pump_action_sequence) == 0: + debug_print("🚨 协议生成结果为空!这是异常情况") + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"🚨 协议生成失败: 无法生成任何动作序列" + } + } + ] + + debug_print("=" * 60) + return pump_action_sequence + + +async def generate_pump_protocol_with_rinsing_async( + G: nx.DiGraph, + from_vessel_id: str, + to_vessel_id: str, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs +) -> List[Dict[str, Any]]: + """ + 异步版本的泵转移协议生成器,避免并发问题 + """ + debug_print("=" * 60) + debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (异步版本)") + debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") + debug_print(f" 🕐 时间戳: {time_module.time()}") + debug_print("=" * 60) + + # 添加唯一标识符 + protocol_id = f"pump_transfer_{int(time_module.time() * 1000000)}" + debug_print(f"📋 协议ID: {protocol_id}") + + # 调用原有的同步版本 + result = generate_pump_protocol_with_rinsing( + G, from_vessel_id, to_vessel_id, volume, amount, time, viscous, + rinsing_solvent, rinsing_volume, rinsing_repeats, solid, + flowrate, transfer_flowrate, rate_spec, event, through, **kwargs + ) + + # 为每个动作添加唯一标识 + for i, action in enumerate(result): + if isinstance(action, dict): + action['_protocol_id'] = protocol_id + action['_action_sequence'] = i + action['_timestamp'] = time_module.time() + + debug_print(f"📊 协议 {protocol_id} 生成完成,共 {len(result)} 个动作") + return result + +# 保持原有的同步版本兼容性 +def generate_pump_protocol_with_rinsing( + G: nx.DiGraph, + from_vessel: dict, + to_vessel: dict, + volume: float = 0.0, + amount: str = "", + time: float = 0.0, + viscous: bool = False, + rinsing_solvent: str = "", + rinsing_volume: float = 0.0, + rinsing_repeats: int = 0, + solid: bool = False, + flowrate: float = 2.5, + transfer_flowrate: float = 0.5, + rate_spec: str = "", + event: str = "", + through: str = "", + **kwargs +) -> List[Dict[str, Any]]: + """ + 原有的同步版本,添加防冲突机制 + """ + from_vessel_id, _ = get_vessel(from_vessel) + to_vessel_id, _ = get_vessel(to_vessel) + + # 添加执行锁,防止并发调用 + import threading + if not hasattr(generate_pump_protocol_with_rinsing, '_lock'): + generate_pump_protocol_with_rinsing._lock = threading.Lock() + + with generate_pump_protocol_with_rinsing._lock: + debug_print("=" * 60) + debug_print(f"PUMP_TRANSFER: 🚀 开始生成协议 (同步版本)") + debug_print(f" 📍 路径: {from_vessel_id} -> {to_vessel_id}") + debug_print(f" 🕐 时间戳: {time_module.time()}") + debug_print(f" 🔒 获得执行锁") + debug_print("=" * 60) + + # 短暂延迟,避免快速重复调用 + time_module.sleep(0.01) + + debug_print("🔍 步骤1: 开始体积处理...") + + # 1. 处理体积参数 + final_volume = volume + debug_print(f"📋 初始设置: final_volume = {final_volume}") + + # 🔧 修复:如果volume为0(ROS2传入的空值),从容器读取实际体积 + if volume == 0.0: + debug_print("🎯 检测到 volume=0.0,开始自动体积检测...") + + # 直接从源容器读取实际体积 + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + debug_print(f"📖 从容器 '{from_vessel_id}' 读取到体积: {actual_volume}mL") + + if actual_volume > 0: + final_volume = actual_volume + debug_print(f"✅ 成功设置体积为: {final_volume}mL") + else: + final_volume = 10.0 # 如果读取失败,使用默认值 + logger.warning(f"⚠️ 无法从容器读取体积,使用默认值: {final_volume}mL") + else: + debug_print(f"📌 体积非零,直接使用: {final_volume}mL") + + # 处理 amount 参数 + if amount and amount.strip(): + debug_print(f"🔍 检测到 amount 参数: '{amount}',开始解析...") + parsed_volume = _parse_amount_to_volume(amount) + debug_print(f"📖 从 amount 解析得到体积: {parsed_volume}mL") + + if parsed_volume > 0: + final_volume = parsed_volume + debug_print(f"✅ 使用从 amount 解析的体积: {final_volume}mL") + elif parsed_volume == 0.0 and amount.lower().strip() == "all": + debug_print("🎯 检测到 amount='all',从容器读取全部体积...") + actual_volume = get_vessel_liquid_volume(G, from_vessel_id) + if actual_volume > 0: + final_volume = actual_volume + debug_print(f"✅ amount='all',设置体积为: {final_volume}mL") + + # 最终体积验证 + debug_print(f"🔍 步骤2: 最终体积验证...") + if final_volume <= 0: + logger.error(f"❌ 体积无效: {final_volume}mL") + final_volume = 10.0 + logger.warning(f"⚠️ 强制设置为默认值: {final_volume}mL") + + debug_print(f"✅ 最终确定体积: {final_volume}mL") + + # 2. 处理流速参数 + debug_print(f"🔍 步骤3: 处理流速参数...") + debug_print(f" - 原始 flowrate: {flowrate}") + debug_print(f" - 原始 transfer_flowrate: {transfer_flowrate}") + + final_flowrate = flowrate if flowrate > 0 else 2.5 + final_transfer_flowrate = transfer_flowrate if transfer_flowrate > 0 else 0.5 + + if flowrate <= 0: + logger.warning(f"⚠️ flowrate <= 0,修正为: {final_flowrate}mL/s") + if transfer_flowrate <= 0: + logger.warning(f"⚠️ transfer_flowrate <= 0,修正为: {final_transfer_flowrate}mL/s") + + debug_print(f"✅ 修正后流速: flowrate={final_flowrate}mL/s, transfer_flowrate={final_transfer_flowrate}mL/s") + + # 3. 根据时间计算流速 + if time > 0 and final_volume > 0: + debug_print(f"🔍 步骤4: 根据时间计算流速...") + calculated_flowrate = final_volume / time + debug_print(f" - 计算得到流速: {calculated_flowrate}mL/s") + + if flowrate <= 0 or flowrate == 2.5: + final_flowrate = min(calculated_flowrate, 10.0) + debug_print(f" - 调整 flowrate 为: {final_flowrate}mL/s") + if transfer_flowrate <= 0 or transfer_flowrate == 0.5: + final_transfer_flowrate = min(calculated_flowrate, 5.0) + debug_print(f" - 调整 transfer_flowrate 为: {final_transfer_flowrate}mL/s") + + # 4. 根据速度规格调整 + if rate_spec: + debug_print(f"🔍 步骤5: 根据速度规格调整...") + debug_print(f" - 速度规格: '{rate_spec}'") + + if rate_spec == "dropwise": + final_flowrate = min(final_flowrate, 0.1) + final_transfer_flowrate = min(final_transfer_flowrate, 0.1) + debug_print(f" - dropwise模式,流速调整为: {final_flowrate}mL/s") + elif rate_spec == "slowly": + final_flowrate = min(final_flowrate, 0.5) + final_transfer_flowrate = min(final_transfer_flowrate, 0.3) + debug_print(f" - slowly模式,流速调整为: {final_flowrate}mL/s") + elif rate_spec == "quickly": + final_flowrate = max(final_flowrate, 5.0) + final_transfer_flowrate = max(final_transfer_flowrate, 2.0) + debug_print(f" - quickly模式,流速调整为: {final_flowrate}mL/s") + + # # 5. 处理冲洗参数 + # debug_print(f"🔍 步骤6: 处理冲洗参数...") + # final_rinsing_solvent = rinsing_solvent + # final_rinsing_volume = rinsing_volume if rinsing_volume > 0 else 5.0 + # final_rinsing_repeats = rinsing_repeats if rinsing_repeats > 0 else 2 + + # if rinsing_volume <= 0: + # logger.warning(f"⚠️ rinsing_volume <= 0,修正为: {final_rinsing_volume}mL") + # if rinsing_repeats <= 0: + # logger.warning(f"⚠️ rinsing_repeats <= 0,修正为: {final_rinsing_repeats}次") + + # # 根据物理属性调整冲洗参数 + # if viscous or solid: + # final_rinsing_repeats = max(final_rinsing_repeats, 3) + # final_rinsing_volume = max(final_rinsing_volume, 10.0) + # debug_print(f"🧪 粘稠/固体物质,调整冲洗参数:{final_rinsing_repeats}次,{final_rinsing_volume}mL") + + try: + pump_action_sequence = generate_pump_protocol( + G, from_vessel_id, to_vessel_id, final_volume, + flowrate, transfer_flowrate + ) + + # 为每个动作添加唯一标识 + # for i, action in enumerate(pump_action_sequence): + # if isinstance(action, dict): + # action['_protocol_id'] = protocol_id + # action['_action_sequence'] = i + # elif isinstance(action, list): + # for j, sub_action in enumerate(action): + # if isinstance(sub_action, dict): + # sub_action['_protocol_id'] = protocol_id + # sub_action['_action_sequence'] = f"{i}_{j}" + # + # debug_print(f"📊 协议 {protocol_id} 生成完成,共 {len(pump_action_sequence)} 个动作") + debug_print(f"🔓 释放执行锁") + return pump_action_sequence + + except Exception as e: + traceback.print_exc() + logger.error(f"❌ 协议生成失败: {str(e)}") + return [ + { + "device_id": "system", + "action_name": "log_message", + "action_kwargs": { + "message": f"❌ 协议生成失败: {str(e)}" + } + } + ] + def _parse_amount_to_volume(amount: str) -> float: """解析 amount 字符串为体积""" debug_print(f"🔍 解析 amount: '{amount}'") - + if not amount: debug_print(" - amount 为空,返回 0.0") return 0.0 @@ -789,7 +1740,7 @@ def _parse_amount_to_volume(amount: str) -> float: import re numbers = re.findall(r'[\d.]+', amount) debug_print(f" - 提取到的数字: {numbers}") - + if numbers: volume = float(numbers[0]) debug_print(f" - 基础体积: {volume}") @@ -814,21 +1765,14 @@ def _parse_amount_to_volume(amount: str) -> float: return 0.0 -def _generate_rinsing_sequence( - G: nx.DiGraph, - from_vessel_id: str, - to_vessel_id: str, - rinsing_solvent: str, - rinsing_volume: float, - rinsing_repeats: int, - flowrate: float, - transfer_flowrate: float -) -> List[Dict[str, Any]]: +def _generate_rinsing_sequence(G: nx.DiGraph, from_vessel_id: str, to_vessel_id: str, + rinsing_solvent: str, rinsing_volume: float, + rinsing_repeats: int, flowrate: float, + transfer_flowrate: float) -> List[Dict[str, Any]]: """生成冲洗动作序列""" rinsing_actions = [] try: - shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) shortest_path = nx.shortest_path(G, source=from_vessel_id, target=to_vessel_id) pump_backbone = shortest_path[1:-1] @@ -859,32 +1803,27 @@ def _generate_rinsing_sequence( # 清洗泵系统 rinsing_actions.extend( - generate_pump_protocol(G, solvent_vessel, pump_backbone[0], min_transfer_volume, flowrate, - transfer_flowrate) + generate_pump_protocol(G, solvent_vessel, pump_backbone[0], min_transfer_volume, flowrate, transfer_flowrate) ) if len(pump_backbone) > 1: rinsing_actions.extend( - generate_pump_protocol(G, pump_backbone[0], pump_backbone[-1], min_transfer_volume, flowrate, - transfer_flowrate) + generate_pump_protocol(G, pump_backbone[0], pump_backbone[-1], min_transfer_volume, flowrate, transfer_flowrate) ) # 排到废液容器 if waste_vessel in G.nodes(): rinsing_actions.extend( - generate_pump_protocol(G, pump_backbone[-1], waste_vessel, min_transfer_volume, flowrate, - transfer_flowrate) + generate_pump_protocol(G, pump_backbone[-1], waste_vessel, min_transfer_volume, flowrate, transfer_flowrate) ) # 第一种冲洗溶剂稀释源容器和目标容器 if solvent == rinsing_solvents[0]: rinsing_actions.extend( - generate_pump_protocol(G, solvent_vessel, from_vessel_id, rinsing_volume, flowrate, - transfer_flowrate) + generate_pump_protocol(G, solvent_vessel, from_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) rinsing_actions.extend( generate_pump_protocol(G, solvent_vessel, to_vessel_id, rinsing_volume, flowrate, transfer_flowrate) - generate_pump_protocol(G, solvent_vessel, to_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) except Exception as e: @@ -894,8 +1833,8 @@ def _generate_rinsing_sequence( def _generate_air_rinsing_sequence(G: nx.DiGraph, from_vessel_id: str, to_vessel_id: str, - rinsing_volume: float, repeats: int, - flowrate: float, transfer_flowrate: float) -> List[Dict[str, Any]]: + rinsing_volume: float, repeats: int, + flowrate: float, transfer_flowrate: float) -> List[Dict[str, Any]]: """生成空气冲洗序列""" air_rinsing_actions = [] @@ -909,16 +1848,14 @@ def _generate_air_rinsing_sequence(G: nx.DiGraph, from_vessel_id: str, to_vessel # 空气冲洗源容器 air_rinsing_actions.extend( generate_pump_protocol(G, air_vessel, from_vessel_id, rinsing_volume, flowrate, transfer_flowrate) - generate_pump_protocol(G, air_vessel, from_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) # 空气冲洗目标容器 air_rinsing_actions.extend( generate_pump_protocol(G, air_vessel, to_vessel_id, rinsing_volume, flowrate, transfer_flowrate) - generate_pump_protocol(G, air_vessel, to_vessel_id, rinsing_volume, flowrate, transfer_flowrate) ) except Exception as e: logger.warning(f"空气冲洗失败: {str(e)}") - return air_rinsing_actions \ No newline at end of file + return air_rinsing_actions