diff --git a/README.md b/README.md index 94010e9..0d02849 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ conda env update --file unilabos-[YOUR_OS].yml -n environment_name # Currently, you need to install the `unilabos_msgs` package # You can download the system-specific package from the Release page -conda install ros-humble-unilabos-msgs-0.9.7-xxxxx.tar.bz2 +conda install ros-humble-unilabos-msgs-0.9.8-xxxxx.tar.bz2 # Install PyLabRobot and other prerequisites git clone https://github.com/PyLabRobot/pylabrobot plr_repo diff --git a/README_zh.md b/README_zh.md index d4f77f4..a80f06c 100644 --- a/README_zh.md +++ b/README_zh.md @@ -49,7 +49,7 @@ conda env update --file unilabos-[YOUR_OS].yml -n 环境名 # 现阶段,需要安装 `unilabos_msgs` 包 # 可以前往 Release 页面下载系统对应的包进行安装 -conda install ros-humble-unilabos-msgs-0.9.7-xxxxx.tar.bz2 +conda install ros-humble-unilabos-msgs-0.9.8-xxxxx.tar.bz2 # 安装PyLabRobot等前置 git clone https://github.com/PyLabRobot/pylabrobot plr_repo diff --git a/recipes/ros-humble-unilabos-msgs/recipe.yaml b/recipes/ros-humble-unilabos-msgs/recipe.yaml index e476d1b..dde6acc 100644 --- a/recipes/ros-humble-unilabos-msgs/recipe.yaml +++ b/recipes/ros-humble-unilabos-msgs/recipe.yaml @@ -1,6 +1,6 @@ package: name: ros-humble-unilabos-msgs - version: 0.9.7 + version: 0.9.8 source: path: ../../unilabos_msgs folder: ros-humble-unilabos-msgs/src/work diff --git a/recipes/unilabos/recipe.yaml b/recipes/unilabos/recipe.yaml index 2a48b04..c54e141 100644 --- a/recipes/unilabos/recipe.yaml +++ b/recipes/unilabos/recipe.yaml @@ -1,6 +1,6 @@ package: name: unilabos - version: "0.9.7" + version: "0.9.8" source: path: ../.. diff --git a/setup.py b/setup.py index 8fd9bbc..4dde107 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ package_name = 'unilabos' setup( name=package_name, - version='0.9.7', + version='0.9.8', packages=find_packages(), include_package_data=True, install_requires=['setuptools'], diff --git a/test/experiments/comprehensive_protocol/checklist.md b/test/experiments/comprehensive_protocol/checklist.md index ed73ec5..ce6de37 100644 --- a/test/experiments/comprehensive_protocol/checklist.md +++ b/test/experiments/comprehensive_protocol/checklist.md @@ -23,6 +23,7 @@ HeatChillProtocol: generate_heat_chill_protocol, (√) HeatChillStartProtocol: generate_heat_chill_start_protocol, (√) HeatChillStopProtocol: generate_heat_chill_stop_protocol, (√) + HeatChillToTempProtocol: StirProtocol: generate_stir_protocol, (√) StartStirProtocol: generate_start_stir_protocol, (√) StopStirProtocol: generate_stop_stir_protocol, (√) @@ -30,7 +31,13 @@ CleanVesselProtocol: generate_clean_vessel_protocol, (√) DissolveProtocol: generate_dissolve_protocol, (√) FilterThroughProtocol: generate_filter_through_protocol, (√) - RunColumnProtocol: generate_run_column_protocol, (×) - WashSolidProtocol: generate_wash_solid_protocol, (×) + RunColumnProtocol: generate_run_column_protocol, (√) -上下文体积搜索 \ No newline at end of file +上下文体积搜索 +3. 还没创建的protocol + ResetHandling 写完了 + Dry 写完了 + AdjustPH 写完了 + Recrystallize 写完了 + TakeSample + Hydrogenate \ No newline at end of file diff --git a/test/experiments/comprehensive_protocol/comprehensive_station.json b/test/experiments/comprehensive_protocol/comprehensive_station.json index 0d9f5c7..d0f5c6a 100644 --- a/test/experiments/comprehensive_protocol/comprehensive_station.json +++ b/test/experiments/comprehensive_protocol/comprehensive_station.json @@ -23,8 +23,10 @@ "waste_bottle_2", "solenoid_valve_1", "solenoid_valve_2", + "solenoid_valve_3", "vacuum_pump_1", "gas_source_1", + "h2_gas_source", "filter_1", "column_1", "separator_1", @@ -60,7 +62,12 @@ "HeatChillStartProtocol", "HeatChillStopProtocol", "EvacuateAndRefillProtocol", - "PumpTransferProtocol" + "PumpTransferProtocol", + "AdjustPHProtocol", + "ResetHandlingProtocol", + "DryProtocol", + "HydrogenateProtocol", + "RecrystallizeProtocol" ] }, "data": {} @@ -461,6 +468,28 @@ "is_open": false } }, + { + "id": "solenoid_valve_3", + "name": "氢气电磁阀", + "children": [], + "parent": "OrganicSynthesisStation", + "type": "device", + "class": "virtual_solenoid_valve", + "position": { + "x": 450, + "y": 400, + "z": 0 + }, + "config": { + "voltage": 12.0, + "response_time": 0.1, + "gas_compatible": true + }, + "data": { + "valve_state": "Closed", + "is_open": false + } + }, { "id": "vacuum_pump_1", "name": "真空泵", @@ -500,6 +529,29 @@ "max_pressure": 5.0 } }, + { + "id": "h2_gas_source", + "name": "氢气气源", + "children": [], + "parent": "OrganicSynthesisStation", + "type": "device", + "class": "virtual_gas_source", + "position": { + "x": 500, + "y": 350, + "z": 0 + }, + "config": { + "max_pressure": 10.0, + "gas_type": "hydrogen" + }, + "data": { + "gas_type": "hydrogen", + "max_pressure": 10.0, + "current_pressure": 0.0, + "status": "OFF" + } + }, { "id": "filter_1", "name": "过滤器", @@ -874,14 +926,14 @@ } }, { - "id": "link_filter_filtrate_to_collection1", - "source": "filter_1", - "target": "collection_bottle_1", - "type": "transport", - "port": { - "filter_1": "filtrateout", - "collection_bottle_1": "top" - } + "id": "link_filter_filtrate_to_collection1", + "source": "filter_1", + "target": "collection_bottle_1", + "type": "transport", + "port": { + "filter_1": "filtrateout", + "collection_bottle_1": "top" + } }, { "id": "link_filter_retentate_to_waste1", @@ -892,6 +944,26 @@ "filter_1": "retentateout", "waste_bottle_1": "top" } + }, + { + "id": "link_h2_gas_to_valve3", + "source": "h2_gas_source", + "target": "solenoid_valve_3", + "type": "fluid", + "port": { + "h2_gas_source": "gassource", + "solenoid_valve_3": "in" + } + }, + { + "id": "link_valve3_to_reactor", + "source": "solenoid_valve_3", + "target": "main_reactor", + "type": "fluid", + "port": { + "solenoid_valve_3": "out", + "main_reactor": "top" + } } ] } \ No newline at end of file diff --git a/unilabos/compile/__init__.py b/unilabos/compile/__init__.py index 98ea8a2..ef6f8c6 100644 --- a/unilabos/compile/__init__.py +++ b/unilabos/compile/__init__.py @@ -21,16 +21,23 @@ from .dissolve_protocol import generate_dissolve_protocol from .filter_through_protocol import generate_filter_through_protocol from .run_column_protocol import generate_run_column_protocol from .wash_solid_protocol import generate_wash_solid_protocol +from .adjustph_protocol import generate_adjust_ph_protocol +from .reset_handling_protocol import generate_reset_handling_protocol +from .dry_protocol import generate_dry_protocol +from .recrystallize_protocol import generate_recrystallize_protocol +from .hydrogenate_protocol import generate_hydrogenate_protocol # Define a dictionary of protocol generators. action_protocol_generators = { AddProtocol: generate_add_protocol, AGVTransferProtocol: generate_agv_transfer_protocol, + AdjustPHProtocol: generate_adjust_ph_protocol, CentrifugeProtocol: generate_centrifuge_protocol, CleanProtocol: generate_clean_protocol, CleanVesselProtocol: generate_clean_vessel_protocol, DissolveProtocol: generate_dissolve_protocol, + DryProtocol: generate_dry_protocol, EvacuateAndRefillProtocol: generate_evacuateandrefill_protocol, EvaporateProtocol: generate_evaporate_protocol, FilterProtocol: generate_filter_protocol, @@ -38,7 +45,10 @@ action_protocol_generators = { HeatChillProtocol: generate_heat_chill_protocol, HeatChillStartProtocol: generate_heat_chill_start_protocol, HeatChillStopProtocol: generate_heat_chill_stop_protocol, + HydrogenateProtocol: generate_hydrogenate_protocol, PumpTransferProtocol: generate_pump_protocol_with_rinsing, + RecrystallizeProtocol: generate_recrystallize_protocol, + ResetHandlingProtocol: generate_reset_handling_protocol, RunColumnProtocol: generate_run_column_protocol, SeparateProtocol: generate_separate_protocol, StartStirProtocol: generate_start_stir_protocol, diff --git a/unilabos/compile/adjustph_protocol.py b/unilabos/compile/adjustph_protocol.py new file mode 100644 index 0000000..ce7c1c3 --- /dev/null +++ b/unilabos/compile/adjustph_protocol.py @@ -0,0 +1,411 @@ +import networkx as nx +from typing import List, Dict, Any +from .pump_protocol import generate_pump_protocol_with_rinsing + + +def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: + """ + 查找酸碱试剂容器,支持多种匹配模式 + + Args: + G: 网络图 + reagent: 试剂名称(如 "hydrochloric acid", "sodium hydroxide") + + Returns: + str: 试剂容器ID + """ + print(f"ADJUST_PH: 正在查找试剂 '{reagent}' 的容器...") + + # 常见酸碱试剂的别名映射 + reagent_aliases = { + "hydrochloric acid": ["HCl", "hydrochloric_acid", "hcl", "muriatic_acid"], + "sodium hydroxide": ["NaOH", "sodium_hydroxide", "naoh", "caustic_soda"], + "sulfuric acid": ["H2SO4", "sulfuric_acid", "h2so4"], + "nitric acid": ["HNO3", "nitric_acid", "hno3"], + "acetic acid": ["CH3COOH", "acetic_acid", "glacial_acetic_acid"], + "ammonia": ["NH3", "ammonium_hydroxide", "nh3"], + "potassium hydroxide": ["KOH", "potassium_hydroxide", "koh"] + } + + # 构建搜索名称列表 + search_names = [reagent.lower()] + + # 添加别名 + for base_name, aliases in reagent_aliases.items(): + if reagent.lower() in base_name.lower() or base_name.lower() in reagent.lower(): + search_names.extend([alias.lower() for alias in aliases]) + + # 构建可能的容器名称 + possible_names = [] + for name in search_names: + name_clean = name.replace(" ", "_").replace("-", "_") + possible_names.extend([ + f"flask_{name_clean}", + f"bottle_{name_clean}", + f"reagent_{name_clean}", + f"acid_{name_clean}" if "acid" in name else f"base_{name_clean}", + f"{name_clean}_bottle", + f"{name_clean}_flask", + name_clean + ]) + + # 第一步:通过容器名称匹配 + for vessel_name in possible_names: + if vessel_name in G.nodes(): + print(f"ADJUST_PH: 通过名称匹配找到容器: {vessel_name}") + return vessel_name + + # 第二步:通过模糊匹配 + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + node_name = G.nodes[node_id].get('name', '').lower() + + # 检查是否包含任何搜索名称 + for search_name in search_names: + if search_name in node_id.lower() or search_name in node_name: + print(f"ADJUST_PH: 通过模糊匹配找到容器: {node_id}") + return node_id + + # 第三步:通过液体类型匹配 + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + liquids = vessel_data.get('liquid', []) + + for liquid in liquids: + if isinstance(liquid, dict): + liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() + reagent_name = vessel_data.get('reagent_name', '').lower() + + for search_name in search_names: + if search_name in liquid_type or search_name in reagent_name: + print(f"ADJUST_PH: 通过液体类型匹配找到容器: {node_id}") + return node_id + + # 列出可用容器帮助调试 + available_containers = [] + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + liquids = vessel_data.get('liquid', []) + liquid_types = [liquid.get('liquid_type', '') or liquid.get('name', '') + for liquid in liquids if isinstance(liquid, dict)] + + available_containers.append({ + 'id': node_id, + 'name': G.nodes[node_id].get('name', ''), + 'liquids': liquid_types, + 'reagent_name': vessel_data.get('reagent_name', '') + }) + + print(f"ADJUST_PH: 可用容器列表:") + for container in available_containers: + print(f" - {container['id']}: {container['name']}") + print(f" 液体: {container['liquids']}") + print(f" 试剂: {container['reagent_name']}") + + raise ValueError(f"找不到试剂 '{reagent}' 对应的容器。尝试了: {possible_names}") + + +def find_connected_stirrer(G: nx.DiGraph, vessel: str) -> str: + """查找与容器相连的搅拌器""" + stirrer_nodes = [node for node in G.nodes() + if (G.nodes[node].get('class') or '') == 'virtual_stirrer'] + + for stirrer in stirrer_nodes: + if G.has_edge(stirrer, vessel) or G.has_edge(vessel, stirrer): + return stirrer + + return stirrer_nodes[0] if stirrer_nodes else None + + +def calculate_reagent_volume(target_ph_value: float, reagent: str, vessel_volume: float = 100.0) -> float: # 改为 target_ph_value + """ + 估算需要的试剂体积来调节pH + + Args: + target_ph_value: 目标pH值 # 改为 target_ph_value + reagent: 试剂名称 + vessel_volume: 容器体积 (mL) + + Returns: + float: 估算的试剂体积 (mL) + """ + # 简化的pH调节体积估算(实际应用中需要更精确的计算) + if "acid" in reagent.lower() or "hcl" in reagent.lower(): + # 酸性试剂:pH越低需要的体积越大 + if target_ph_value < 3: # 改为 target_ph_value + return vessel_volume * 0.05 # 5% + elif target_ph_value < 5: # 改为 target_ph_value + return vessel_volume * 0.02 # 2% + else: + return vessel_volume * 0.01 # 1% + + elif "hydroxide" in reagent.lower() or "naoh" in reagent.lower(): + # 碱性试剂:pH越高需要的体积越大 + if target_ph_value > 11: # 改为 target_ph_value + return vessel_volume * 0.05 # 5% + elif target_ph_value > 9: # 改为 target_ph_value + return vessel_volume * 0.02 # 2% + else: + return vessel_volume * 0.01 # 1% + + else: + # 未知试剂,使用默认值 + return vessel_volume * 0.01 + + +def generate_adjust_ph_protocol( + G: nx.DiGraph, + vessel: str, + ph_value: float, # 改为 ph_value + reagent: str, + **kwargs +) -> List[Dict[str, Any]]: + """ + 生成调节pH的协议序列 + + Args: + G: 有向图,节点为容器和设备 + vessel: 目标容器(需要调节pH的容器) + ph_value: 目标pH值(从XDL传入) # 改为 ph_value + reagent: 酸碱试剂名称(从XDL传入) + **kwargs: 其他可选参数,使用默认值 + + Returns: + List[Dict[str, Any]]: 动作序列 + """ + action_sequence = [] + + # 从kwargs中获取可选参数,如果没有则使用默认值 + volume = kwargs.get('volume', 0.0) # 自动估算体积 + stir = kwargs.get('stir', True) # 默认搅拌 + stir_speed = kwargs.get('stir_speed', 300.0) # 默认搅拌速度 + stir_time = kwargs.get('stir_time', 60.0) # 默认搅拌时间 + settling_time = kwargs.get('settling_time', 30.0) # 默认平衡时间 + + print(f"ADJUST_PH: 开始生成pH调节协议") + print(f" - 目标容器: {vessel}") + print(f" - 目标pH: {ph_value}") # 改为 ph_value + print(f" - 试剂: {reagent}") + print(f" - 使用默认参数: 体积=自动估算, 搅拌=True, 搅拌速度=300RPM") + + # 1. 验证目标容器存在 + if vessel not in G.nodes(): + raise ValueError(f"目标容器 '{vessel}' 不存在于系统中") + + # 2. 查找酸碱试剂容器 + try: + reagent_vessel = find_acid_base_vessel(G, reagent) + print(f"ADJUST_PH: 找到试剂容器: {reagent_vessel}") + except ValueError as e: + raise ValueError(f"无法找到试剂 '{reagent}': {str(e)}") + + # 3. 如果未指定体积,自动估算 + if volume <= 0: + # 获取目标容器的体积信息 + vessel_data = G.nodes[vessel].get('data', {}) + vessel_volume = vessel_data.get('max_volume', 100.0) # 默认100mL + + estimated_volume = calculate_reagent_volume(ph_value, reagent, vessel_volume) # 改为 ph_value + volume = estimated_volume + print(f"ADJUST_PH: 自动估算试剂体积: {volume:.2f} mL") + + # 4. 验证路径存在 + try: + path = nx.shortest_path(G, source=reagent_vessel, target=vessel) + print(f"ADJUST_PH: 找到路径: {' → '.join(path)}") + except nx.NetworkXNoPath: + raise ValueError(f"从试剂容器 '{reagent_vessel}' 到目标容器 '{vessel}' 没有可用路径") + + # 5. 先启动搅拌(如果需要) + stirrer_id = None + if stir: + try: + stirrer_id = find_connected_stirrer(G, vessel) + + if stirrer_id: + print(f"ADJUST_PH: 找到搅拌器 {stirrer_id},启动搅拌") + action_sequence.append({ + "device_id": stirrer_id, + "action_name": "start_stir", + "action_kwargs": { + "vessel": vessel, + "stir_speed": stir_speed, + "purpose": f"pH调节: 启动搅拌,准备添加 {reagent}" + } + }) + + # 等待搅拌稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": {"time": 5} + }) + else: + print(f"ADJUST_PH: 警告 - 未找到搅拌器,继续执行") + + except Exception as e: + print(f"ADJUST_PH: 搅拌器配置出错: {str(e)}") + + # 6. 缓慢添加试剂 - 使用pump_protocol + print(f"ADJUST_PH: 开始添加试剂 {volume:.2f} mL") + + # 计算添加时间(pH调节需要缓慢添加) + addition_time = max(30.0, volume * 2.0) # 至少30秒,每mL需要2秒 + + try: + pump_actions = generate_pump_protocol_with_rinsing( + G=G, + from_vessel=reagent_vessel, + to_vessel=vessel, + volume=volume, + amount="", + time=addition_time, + viscous=False, + rinsing_solvent="", # pH调节不需要清洗 + rinsing_volume=0.0, + rinsing_repeats=0, + solid=False, + flowrate=0.5 # 缓慢注入 + ) + + action_sequence.extend(pump_actions) + + except Exception as e: + raise ValueError(f"生成泵协议时出错: {str(e)}") + + # 7. 持续搅拌以混合和平衡 + if stir and stirrer_id: + print(f"ADJUST_PH: 持续搅拌 {stir_time} 秒以混合试剂") + action_sequence.append({ + "device_id": stirrer_id, + "action_name": "stir", + "action_kwargs": { + "stir_time": stir_time, + "stir_speed": stir_speed, + "settling_time": settling_time, + "purpose": f"pH调节: 混合试剂,目标pH={ph_value}" # 改为 ph_value + } + }) + + # 8. 等待反应平衡 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": settling_time, + "description": f"等待pH平衡到目标值 {ph_value}" # 改为 ph_value + } + }) + + print(f"ADJUST_PH: 协议生成完成,共 {len(action_sequence)} 个动作") + print(f"ADJUST_PH: 预计总时间: {addition_time + stir_time + settling_time:.0f} 秒") + + return action_sequence + + +def generate_adjust_ph_protocol_stepwise( + G: nx.DiGraph, + vessel: str, + ph_value: float, + reagent: str, + max_volume: float = 10.0, + steps: int = 3 +) -> List[Dict[str, Any]]: + """ + 分步调节pH的协议(更安全,避免过度调节) + + Args: + G: 网络图 + vessel: 目标容器 + pH: 目标pH值 + reagent: 酸碱试剂 + max_volume: 最大试剂体积 + steps: 分步数量 + + Returns: + List[Dict[str, Any]]: 动作序列 + """ + action_sequence = [] + + print(f"ADJUST_PH: 开始分步pH调节({steps}步)") + + # 每步添加的体积 + step_volume = max_volume / steps + + for i in range(steps): + print(f"ADJUST_PH: 第 {i+1}/{steps} 步,添加 {step_volume} mL") + + # 生成单步协议 + step_actions = generate_adjust_ph_protocol( + G=G, + vessel=vessel, + ph_value=ph_value, + reagent=reagent, + volume=step_volume, + stir=True, + stir_speed=300.0, + stir_time=30.0, + settling_time=20.0 + ) + + action_sequence.extend(step_actions) + + # 步骤间等待 + if i < steps - 1: + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 30, + "description": f"pH调节第{i+1}步完成,等待下一步" + } + }) + + print(f"ADJUST_PH: 分步pH调节完成") + return action_sequence + + +# 便捷函数:常用pH调节 +def generate_acidify_protocol( + G: nx.DiGraph, + vessel: str, + target_ph: float = 2.0, + acid: str = "hydrochloric acid" +) -> List[Dict[str, Any]]: + """酸化协议""" + return generate_adjust_ph_protocol( + G, vessel, target_ph, acid, 0.0, True, 300.0, 120.0, 60.0 + ) + + +def generate_basify_protocol( + G: nx.DiGraph, + vessel: str, + target_ph: float = 12.0, + base: str = "sodium hydroxide" +) -> List[Dict[str, Any]]: + """碱化协议""" + return generate_adjust_ph_protocol( + G, vessel, target_ph, base, 0.0, True, 300.0, 120.0, 60.0 + ) + + +def generate_neutralize_protocol( + G: nx.DiGraph, + vessel: str, + reagent: str = "sodium hydroxide" +) -> List[Dict[str, Any]]: + """中和协议(pH=7)""" + return generate_adjust_ph_protocol( + G, vessel, 7.0, reagent, 0.0, True, 350.0, 180.0, 90.0 + ) + + +# 测试函数 +def test_adjust_ph_protocol(): + """测试pH调节协议""" + print("=== ADJUST PH PROTOCOL 测试 ===") + print("测试完成") + + +if __name__ == "__main__": + test_adjust_ph_protocol() \ No newline at end of file diff --git a/unilabos/compile/dry_protocol.py b/unilabos/compile/dry_protocol.py new file mode 100644 index 0000000..34044eb --- /dev/null +++ b/unilabos/compile/dry_protocol.py @@ -0,0 +1,165 @@ +import networkx as nx +from typing import List, Dict, Any + + +def find_connected_heater(G: nx.DiGraph, vessel: str) -> str: + """ + 查找与容器相连的加热器 + + Args: + G: 网络图 + vessel: 容器名称 + + Returns: + str: 加热器ID,如果没有则返回None + """ + print(f"DRY: 正在查找与容器 '{vessel}' 相连的加热器...") + + # 查找所有加热器节点 + heater_nodes = [node for node in G.nodes() + if ('heater' in node.lower() or + 'heat' in node.lower() or + G.nodes[node].get('class') == 'virtual_heatchill' or + G.nodes[node].get('type') == 'heater')] + + print(f"DRY: 找到的加热器节点: {heater_nodes}") + + # 检查是否有加热器与目标容器相连 + for heater in heater_nodes: + if G.has_edge(heater, vessel) or G.has_edge(vessel, heater): + print(f"DRY: 找到与容器 '{vessel}' 相连的加热器: {heater}") + return heater + + # 如果没有直接连接,查找距离最近的加热器 + for heater in heater_nodes: + try: + path = nx.shortest_path(G, source=heater, target=vessel) + if len(path) <= 3: # 最多2个中间节点 + print(f"DRY: 找到距离较近的加热器: {heater}, 路径: {' → '.join(path)}") + return heater + except nx.NetworkXNoPath: + continue + + print(f"DRY: 未找到与容器 '{vessel}' 相连的加热器") + return None + + +def generate_dry_protocol( + G: nx.DiGraph, + compound: str, + vessel: str, + **kwargs # 接收其他可能的参数但不使用 +) -> List[Dict[str, Any]]: + """ + 生成干燥协议序列 + + Args: + G: 有向图,节点为容器和设备 + compound: 化合物名称(从XDL传入) + vessel: 目标容器(从XDL传入) + **kwargs: 其他可选参数,但不使用 + + Returns: + List[Dict[str, Any]]: 动作序列 + """ + action_sequence = [] + + # 默认参数 + dry_temp = 60.0 # 默认干燥温度 60°C + dry_time = 3600.0 # 默认干燥时间 1小时(3600秒) + + print(f"DRY: 开始生成干燥协议") + print(f" - 化合物: {compound}") + print(f" - 容器: {vessel}") + print(f" - 干燥温度: {dry_temp}°C") + print(f" - 干燥时间: {dry_time/60:.0f} 分钟") + + # 1. 验证目标容器存在 + if vessel not in G.nodes(): + print(f"DRY: 警告 - 容器 '{vessel}' 不存在于系统中,跳过干燥") + return action_sequence + + # 2. 查找相连的加热器 + heater_id = find_connected_heater(G, vessel) + + if heater_id is None: + print(f"DRY: 警告 - 未找到与容器 '{vessel}' 相连的加热器,跳过干燥") + # 添加一个等待动作,表示干燥过程(模拟) + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 60.0, # 等待1分钟 + "description": f"模拟干燥 {compound} (无加热器可用)" + } + }) + return action_sequence + + # 3. 启动加热器进行干燥 + print(f"DRY: 启动加热器 {heater_id} 进行干燥") + + # 3.1 启动加热 + action_sequence.append({ + "device_id": heater_id, + "action_name": "heat_chill_start", + "action_kwargs": { + "vessel": vessel, + "temp": dry_temp, + "purpose": f"干燥 {compound}" + } + }) + + # 3.2 等待温度稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 60.0, + "description": f"等待温度稳定到 {dry_temp}°C" + } + }) + + # 3.3 保持干燥温度 + action_sequence.append({ + "device_id": heater_id, + "action_name": "heat_chill", + "action_kwargs": { + "vessel": vessel, + "temp": dry_temp, + "time": dry_time, + "purpose": f"干燥 {compound},保持温度 {dry_temp}°C" + } + }) + + # 3.4 停止加热 + action_sequence.append({ + "device_id": heater_id, + "action_name": "heat_chill_stop", + "action_kwargs": { + "vessel": vessel, + "purpose": f"干燥完成,停止加热" + } + }) + + # 3.5 等待冷却 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 300.0, # 等待5分钟冷却 + "description": f"等待 {compound} 冷却" + } + }) + + print(f"DRY: 协议生成完成,共 {len(action_sequence)} 个动作") + print(f"DRY: 预计总时间: {(dry_time + 360)/60:.0f} 分钟") + + return action_sequence + + +# 测试函数 +def test_dry_protocol(): + """测试干燥协议""" + print("=== DRY PROTOCOL 测试 ===") + print("测试完成") + + +if __name__ == "__main__": + test_dry_protocol() \ No newline at end of file diff --git a/unilabos/compile/hydrogenate_protocol.py b/unilabos/compile/hydrogenate_protocol.py new file mode 100644 index 0000000..8070705 --- /dev/null +++ b/unilabos/compile/hydrogenate_protocol.py @@ -0,0 +1,366 @@ +import networkx as nx +from typing import List, Dict, Any, Optional + + +def parse_temperature(temp_str: str) -> float: + """ + 解析温度字符串,支持多种格式 + + Args: + temp_str: 温度字符串(如 "45 °C", "45°C", "45") + + Returns: + float: 温度值(摄氏度) + """ + try: + # 移除常见的温度单位和符号 + temp_clean = temp_str.replace("°C", "").replace("°", "").replace("C", "").strip() + return float(temp_clean) + except ValueError: + print(f"HYDROGENATE: 无法解析温度 '{temp_str}',使用默认温度 25°C") + return 25.0 + + +def parse_time(time_str: str) -> float: + """ + 解析时间字符串,支持多种格式 + + Args: + time_str: 时间字符串(如 "2 h", "120 min", "7200 s") + + Returns: + float: 时间值(秒) + """ + try: + time_clean = time_str.lower().strip() + + # 处理小时 + if "h" in time_clean: + hours = float(time_clean.replace("h", "").strip()) + return hours * 3600.0 + + # 处理分钟 + if "min" in time_clean: + minutes = float(time_clean.replace("min", "").strip()) + return minutes * 60.0 + + # 处理秒 + if "s" in time_clean: + seconds = float(time_clean.replace("s", "").strip()) + return seconds + + # 默认按小时处理 + return float(time_clean) * 3600.0 + + except ValueError: + print(f"HYDROGENATE: 无法解析时间 '{time_str}',使用默认时间 2小时") + return 7200.0 # 2小时 + + +def find_associated_solenoid_valve(G: nx.DiGraph, device_id: str) -> Optional[str]: + """查找与指定设备相关联的电磁阀""" + solenoid_valves = [ + node for node in G.nodes() + if ('solenoid' in (G.nodes[node].get('class') or '').lower() + or 'solenoid_valve' in node) + ] + + # 通过网络连接查找直接相连的电磁阀 + for solenoid in solenoid_valves: + if G.has_edge(device_id, solenoid) or G.has_edge(solenoid, device_id): + return solenoid + + # 通过命名规则查找关联的电磁阀 + device_type = "" + if 'gas' in device_id.lower(): + device_type = "gas" + elif 'h2' in device_id.lower() or 'hydrogen' in device_id.lower(): + device_type = "gas" + + if device_type: + for solenoid in solenoid_valves: + if device_type in solenoid.lower(): + return solenoid + + return None + + +def find_connected_device(G: nx.DiGraph, vessel: str, device_type: str) -> str: + """ + 查找与容器相连的指定类型设备 + + Args: + G: 网络图 + vessel: 容器名称 + device_type: 设备类型 ('heater', 'stirrer', 'gas_source') + + Returns: + str: 设备ID,如果没有则返回None + """ + print(f"HYDROGENATE: 正在查找与容器 '{vessel}' 相连的 {device_type}...") + + # 根据设备类型定义搜索关键词 + if device_type == 'heater': + keywords = ['heater', 'heat', 'heatchill'] + device_class = 'virtual_heatchill' + elif device_type == 'stirrer': + keywords = ['stirrer', 'stir'] + device_class = 'virtual_stirrer' + elif device_type == 'gas_source': + keywords = ['gas', 'h2', 'hydrogen'] + device_class = 'virtual_gas_source' + else: + return None + + # 查找设备节点 + device_nodes = [] + for node in G.nodes(): + node_data = G.nodes[node] + node_name = node.lower() + node_class = node_data.get('class', '').lower() + + # 通过名称匹配 + if any(keyword in node_name for keyword in keywords): + device_nodes.append(node) + # 通过类型匹配 + elif device_class in node_class: + device_nodes.append(node) + + print(f"HYDROGENATE: 找到的{device_type}节点: {device_nodes}") + + # 检查是否有设备与目标容器相连 + for device in device_nodes: + if G.has_edge(device, vessel) or G.has_edge(vessel, device): + print(f"HYDROGENATE: 找到与容器 '{vessel}' 相连的{device_type}: {device}") + return device + + # 如果没有直接连接,查找距离最近的设备 + for device in device_nodes: + try: + path = nx.shortest_path(G, source=device, target=vessel) + if len(path) <= 3: # 最多2个中间节点 + print(f"HYDROGENATE: 找到距离较近的{device_type}: {device}") + return device + except nx.NetworkXNoPath: + continue + + print(f"HYDROGENATE: 未找到与容器 '{vessel}' 相连的{device_type}") + return None + + +def generate_hydrogenate_protocol( + G: nx.DiGraph, + temp: str, + time: str, + vessel: str, + **kwargs # 接收其他可能的参数但不使用 +) -> List[Dict[str, Any]]: + """ + 生成氢化反应协议序列 + + Args: + G: 有向图,节点为容器和设备 + temp: 反应温度(如 "45 °C") + time: 反应时间(如 "2 h") + vessel: 反应容器 + **kwargs: 其他可选参数,但不使用 + + Returns: + List[Dict[str, Any]]: 动作序列 + """ + action_sequence = [] + + # 解析参数 + temperature = parse_temperature(temp) + reaction_time = parse_time(time) + + print(f"HYDROGENATE: 开始生成氢化反应协议") + print(f" - 反应温度: {temperature}°C") + print(f" - 反应时间: {reaction_time/3600:.1f} 小时") + print(f" - 反应容器: {vessel}") + + # 1. 验证目标容器存在 + if vessel not in G.nodes(): + print(f"HYDROGENATE: 警告 - 容器 '{vessel}' 不存在于系统中,跳过氢化反应") + return action_sequence + + # 2. 查找相连的设备 + heater_id = find_connected_device(G, vessel, 'heater') + stirrer_id = find_connected_device(G, vessel, 'stirrer') + gas_source_id = find_connected_device(G, vessel, 'gas_source') + + # 3. 启动搅拌器 + if stirrer_id: + print(f"HYDROGENATE: 启动搅拌器 {stirrer_id}") + action_sequence.append({ + "device_id": stirrer_id, + "action_name": "start_stir", + "action_kwargs": { + "vessel": vessel, + "stir_speed": 300.0, + "purpose": "氢化反应: 开始搅拌" + } + }) + else: + print(f"HYDROGENATE: 警告 - 未找到搅拌器,继续执行") + + # 4. 启动气源(氢气)- 修复版本 + if gas_source_id: + print(f"HYDROGENATE: 启动气源 {gas_source_id} (氢气)") + action_sequence.append({ + "device_id": gas_source_id, + "action_name": "set_status", # 修改为 set_status + "action_kwargs": { + "string": "ON" # 修改参数格式 + } + }) + + # 查找相关的电磁阀 + gas_solenoid = find_associated_solenoid_valve(G, gas_source_id) + if gas_solenoid: + print(f"HYDROGENATE: 开启气源电磁阀 {gas_solenoid}") + action_sequence.append({ + "device_id": gas_solenoid, + "action_name": "set_valve_position", + "action_kwargs": { + "command": "OPEN" + } + }) + else: + print(f"HYDROGENATE: 警告 - 未找到气源,继续执行") + + # 5. 等待气体稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 30.0, + "description": "等待氢气环境稳定" + } + }) + + # 6. 启动加热器 + if heater_id: + print(f"HYDROGENATE: 启动加热器 {heater_id} 到 {temperature}°C") + action_sequence.append({ + "device_id": heater_id, + "action_name": "heat_chill_start", + "action_kwargs": { + "vessel": vessel, + "temp": temperature, + "purpose": f"氢化反应: 加热到 {temperature}°C" + } + }) + + # 等待温度稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 120.0, + "description": f"等待温度稳定到 {temperature}°C" + } + }) + + # 保持反应温度 + action_sequence.append({ + "device_id": heater_id, + "action_name": "heat_chill", + "action_kwargs": { + "vessel": vessel, + "temp": temperature, + "time": reaction_time, + "purpose": f"氢化反应: 保持 {temperature}°C,反应 {reaction_time/3600:.1f} 小时" + } + }) + else: + print(f"HYDROGENATE: 警告 - 未找到加热器,使用室温反应") + # 室温反应,只等待时间 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": reaction_time, + "description": f"室温氢化反应 {reaction_time/3600:.1f} 小时" + } + }) + + # 7. 停止加热 + if heater_id: + action_sequence.append({ + "device_id": heater_id, + "action_name": "heat_chill_stop", + "action_kwargs": { + "vessel": vessel, + "purpose": "氢化反应完成,停止加热" + } + }) + + # 8. 等待冷却 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 300.0, + "description": "等待反应混合物冷却" + } + }) + + # 9. 停止气源 - 修复版本 + if gas_source_id: + # 先关闭电磁阀 + gas_solenoid = find_associated_solenoid_valve(G, gas_source_id) + if gas_solenoid: + print(f"HYDROGENATE: 关闭气源电磁阀 {gas_solenoid}") + action_sequence.append({ + "device_id": gas_solenoid, + "action_name": "set_valve_position", + "action_kwargs": { + "command": "CLOSED" + } + }) + + # 再关闭气源 + action_sequence.append({ + "device_id": gas_source_id, + "action_name": "set_status", # 修改为 set_status + "action_kwargs": { + "string": "OFF" # 修改参数格式 + } + }) + + # 10. 停止搅拌 + if stirrer_id: + action_sequence.append({ + "device_id": stirrer_id, + "action_name": "stop_stir", + "action_kwargs": { + "vessel": vessel, + "purpose": "氢化反应完成,停止搅拌" + } + }) + + print(f"HYDROGENATE: 协议生成完成,共 {len(action_sequence)} 个动作") + print(f"HYDROGENATE: 预计总时间: {(reaction_time + 450)/3600:.1f} 小时") + + return action_sequence + + +# 测试函数 +def test_hydrogenate_protocol(): + """测试氢化反应协议""" + print("=== HYDROGENATE PROTOCOL 测试 ===") + + # 测试温度解析 + test_temps = ["45 °C", "45°C", "45", "25 C", "invalid"] + for temp in test_temps: + parsed = parse_temperature(temp) + print(f"温度 '{temp}' -> {parsed}°C") + + # 测试时间解析 + test_times = ["2 h", "120 min", "7200 s", "2", "invalid"] + for time in test_times: + parsed = parse_time(time) + print(f"时间 '{time}' -> {parsed/3600:.1f} 小时") + + print("测试完成") + + +if __name__ == "__main__": + test_hydrogenate_protocol() \ No newline at end of file diff --git a/unilabos/compile/recrystallize_protocol.py b/unilabos/compile/recrystallize_protocol.py new file mode 100644 index 0000000..b69d88b --- /dev/null +++ b/unilabos/compile/recrystallize_protocol.py @@ -0,0 +1,281 @@ +import networkx as nx +from typing import List, Dict, Any, Tuple +from .pump_protocol import generate_pump_protocol_with_rinsing + + +def parse_ratio(ratio_str: str) -> Tuple[float, float]: + """ + 解析比例字符串,支持多种格式 + + Args: + ratio_str: 比例字符串(如 "1:1", "3:7", "50:50") + + Returns: + Tuple[float, float]: 比例元组 (ratio1, ratio2) + """ + try: + # 处理 "1:1", "3:7", "50:50" 等格式 + if ":" in ratio_str: + parts = ratio_str.split(":") + if len(parts) == 2: + ratio1 = float(parts[0]) + ratio2 = float(parts[1]) + return ratio1, ratio2 + + # 处理 "1-1", "3-7" 等格式 + if "-" in ratio_str: + parts = ratio_str.split("-") + if len(parts) == 2: + ratio1 = float(parts[0]) + ratio2 = float(parts[1]) + return ratio1, ratio2 + + # 处理 "1,1", "3,7" 等格式 + if "," in ratio_str: + parts = ratio_str.split(",") + if len(parts) == 2: + ratio1 = float(parts[0]) + ratio2 = float(parts[1]) + return ratio1, ratio2 + + # 默认 1:1 + print(f"RECRYSTALLIZE: 无法解析比例 '{ratio_str}',使用默认比例 1:1") + return 1.0, 1.0 + + except ValueError: + print(f"RECRYSTALLIZE: 比例解析错误 '{ratio_str}',使用默认比例 1:1") + return 1.0, 1.0 + + +def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: + """ + 查找溶剂容器 + + Args: + G: 网络图 + solvent: 溶剂名称 + + Returns: + str: 溶剂容器ID + """ + print(f"RECRYSTALLIZE: 正在查找溶剂 '{solvent}' 的容器...") + + # 构建可能的容器名称 + possible_names = [ + f"flask_{solvent}", + f"bottle_{solvent}", + f"reagent_{solvent}", + f"reagent_bottle_{solvent}", + f"{solvent}_flask", + f"{solvent}_bottle", + f"{solvent}", + f"vessel_{solvent}", + ] + + # 第一步:通过容器名称匹配 + for vessel_name in possible_names: + if vessel_name in G.nodes(): + print(f"RECRYSTALLIZE: 通过名称匹配找到容器: {vessel_name}") + return vessel_name + + # 第二步:通过模糊匹配 + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + node_name = G.nodes[node_id].get('name', '').lower() + + if solvent.lower() in node_id.lower() or solvent.lower() in node_name: + print(f"RECRYSTALLIZE: 通过模糊匹配找到容器: {node_id}") + return node_id + + # 第三步:通过液体类型匹配 + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + liquids = vessel_data.get('liquid', []) + + for liquid in liquids: + if isinstance(liquid, dict): + liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() + reagent_name = vessel_data.get('reagent_name', '').lower() + + if solvent.lower() in liquid_type or solvent.lower() in reagent_name: + print(f"RECRYSTALLIZE: 通过液体类型匹配找到容器: {node_id}") + return node_id + + raise ValueError(f"找不到溶剂 '{solvent}' 对应的容器") + + +def generate_recrystallize_protocol( + G: nx.DiGraph, + ratio: str, + solvent1: str, + solvent2: str, + vessel: str, + volume: float, + **kwargs # 接收其他可能的参数但不使用 +) -> List[Dict[str, Any]]: + """ + 生成重结晶协议序列 + + Args: + G: 有向图,节点为容器和设备 + ratio: 溶剂比例(如 "1:1", "3:7") + solvent1: 第一种溶剂名称 + solvent2: 第二种溶剂名称 + vessel: 目标容器 + volume: 总体积 (mL) + **kwargs: 其他可选参数,但不使用 + + Returns: + List[Dict[str, Any]]: 动作序列 + """ + action_sequence = [] + + print(f"RECRYSTALLIZE: 开始生成重结晶协议") + print(f" - 比例: {ratio}") + print(f" - 溶剂1: {solvent1}") + print(f" - 溶剂2: {solvent2}") + print(f" - 容器: {vessel}") + print(f" - 总体积: {volume} mL") + + # 1. 验证目标容器存在 + if vessel not in G.nodes(): + raise ValueError(f"目标容器 '{vessel}' 不存在于系统中") + + # 2. 解析比例 + ratio1, ratio2 = parse_ratio(ratio) + total_ratio = ratio1 + ratio2 + + # 3. 计算各溶剂体积 + volume1 = volume * (ratio1 / total_ratio) + volume2 = volume * (ratio2 / total_ratio) + + print(f"RECRYSTALLIZE: 解析比例: {ratio1}:{ratio2}") + print(f"RECRYSTALLIZE: {solvent1} 体积: {volume1:.2f} mL") + print(f"RECRYSTALLIZE: {solvent2} 体积: {volume2:.2f} mL") + + # 4. 查找溶剂容器 + try: + solvent1_vessel = find_solvent_vessel(G, solvent1) + print(f"RECRYSTALLIZE: 找到溶剂1容器: {solvent1_vessel}") + except ValueError as e: + raise ValueError(f"无法找到溶剂1 '{solvent1}': {str(e)}") + + try: + solvent2_vessel = find_solvent_vessel(G, solvent2) + print(f"RECRYSTALLIZE: 找到溶剂2容器: {solvent2_vessel}") + except ValueError as e: + raise ValueError(f"无法找到溶剂2 '{solvent2}': {str(e)}") + + # 5. 验证路径存在 + try: + path1 = nx.shortest_path(G, source=solvent1_vessel, target=vessel) + print(f"RECRYSTALLIZE: 溶剂1路径: {' → '.join(path1)}") + except nx.NetworkXNoPath: + raise ValueError(f"从溶剂1容器 '{solvent1_vessel}' 到目标容器 '{vessel}' 没有可用路径") + + try: + path2 = nx.shortest_path(G, source=solvent2_vessel, target=vessel) + print(f"RECRYSTALLIZE: 溶剂2路径: {' → '.join(path2)}") + except nx.NetworkXNoPath: + raise ValueError(f"从溶剂2容器 '{solvent2_vessel}' 到目标容器 '{vessel}' 没有可用路径") + + # 6. 添加第一种溶剂 + print(f"RECRYSTALLIZE: 开始添加溶剂1 {volume1:.2f} mL") + + try: + pump_actions1 = generate_pump_protocol_with_rinsing( + G=G, + from_vessel=solvent1_vessel, + to_vessel=vessel, + volume=volume1, + amount="", + time=0.0, + viscous=False, + rinsing_solvent="", # 重结晶不需要清洗 + rinsing_volume=0.0, + rinsing_repeats=0, + solid=False, + flowrate=2.0, # 正常流速 + transfer_flowrate=0.5 + ) + + action_sequence.extend(pump_actions1) + + except Exception as e: + raise ValueError(f"生成溶剂1泵协议时出错: {str(e)}") + + # 7. 等待溶剂1稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 10.0, + "description": f"等待溶剂1 {solvent1} 稳定" + } + }) + + # 8. 添加第二种溶剂 + print(f"RECRYSTALLIZE: 开始添加溶剂2 {volume2:.2f} mL") + + try: + pump_actions2 = generate_pump_protocol_with_rinsing( + G=G, + from_vessel=solvent2_vessel, + to_vessel=vessel, + volume=volume2, + amount="", + time=0.0, + viscous=False, + rinsing_solvent="", # 重结晶不需要清洗 + rinsing_volume=0.0, + rinsing_repeats=0, + solid=False, + flowrate=2.0, # 正常流速 + transfer_flowrate=0.5 + ) + + action_sequence.extend(pump_actions2) + + except Exception as e: + raise ValueError(f"生成溶剂2泵协议时出错: {str(e)}") + + # 9. 等待溶剂2稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 10.0, + "description": f"等待溶剂2 {solvent2} 稳定" + } + }) + + # 10. 等待重结晶完成 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 600.0, # 等待10分钟进行重结晶 + "description": f"等待重结晶完成({solvent1}:{solvent2} = {ratio})" + } + }) + + print(f"RECRYSTALLIZE: 协议生成完成,共 {len(action_sequence)} 个动作") + print(f"RECRYSTALLIZE: 预计总时间: {620/60:.1f} 分钟") + + return action_sequence + + +# 测试函数 +def test_recrystallize_protocol(): + """测试重结晶协议""" + print("=== RECRYSTALLIZE PROTOCOL 测试 ===") + + # 测试比例解析 + test_ratios = ["1:1", "3:7", "50:50", "1-1", "2,8", "invalid"] + for ratio in test_ratios: + r1, r2 = parse_ratio(ratio) + print(f"比例 '{ratio}' -> {r1}:{r2}") + + print("测试完成") + + +if __name__ == "__main__": + test_recrystallize_protocol() \ No newline at end of file diff --git a/unilabos/compile/reset_handling_protocol.py b/unilabos/compile/reset_handling_protocol.py new file mode 100644 index 0000000..0fa55c2 --- /dev/null +++ b/unilabos/compile/reset_handling_protocol.py @@ -0,0 +1,180 @@ +import networkx as nx +from typing import List, Dict, Any +from .pump_protocol import generate_pump_protocol_with_rinsing + + +def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: + """ + 查找溶剂容器,支持多种匹配模式 + + Args: + G: 网络图 + solvent: 溶剂名称(如 "methanol", "ethanol", "water") + + Returns: + str: 溶剂容器ID + """ + print(f"RESET_HANDLING: 正在查找溶剂 '{solvent}' 的容器...") + + # 构建可能的容器名称 + possible_names = [ + f"flask_{solvent}", # flask_methanol + f"bottle_{solvent}", # bottle_methanol + f"reagent_{solvent}", # reagent_methanol + f"reagent_bottle_{solvent}", # reagent_bottle_methanol + f"{solvent}_flask", # methanol_flask + f"{solvent}_bottle", # methanol_bottle + f"{solvent}", # methanol + f"vessel_{solvent}", # vessel_methanol + ] + + # 第一步:通过容器名称匹配 + for vessel_name in possible_names: + if vessel_name in G.nodes(): + print(f"RESET_HANDLING: 通过名称匹配找到容器: {vessel_name}") + return vessel_name + + # 第二步:通过模糊匹配 + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + node_name = G.nodes[node_id].get('name', '').lower() + + # 检查是否包含溶剂名称 + if solvent.lower() in node_id.lower() or solvent.lower() in node_name: + print(f"RESET_HANDLING: 通过模糊匹配找到容器: {node_id}") + return node_id + + # 第三步:通过液体类型匹配 + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + liquids = vessel_data.get('liquid', []) + + for liquid in liquids: + if isinstance(liquid, dict): + liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() + reagent_name = vessel_data.get('reagent_name', '').lower() + + if solvent.lower() in liquid_type or solvent.lower() in reagent_name: + print(f"RESET_HANDLING: 通过液体类型匹配找到容器: {node_id}") + return node_id + + # 列出可用容器帮助调试 + available_containers = [] + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + liquids = vessel_data.get('liquid', []) + liquid_types = [liquid.get('liquid_type', '') or liquid.get('name', '') + for liquid in liquids if isinstance(liquid, dict)] + + available_containers.append({ + 'id': node_id, + 'name': G.nodes[node_id].get('name', ''), + 'liquids': liquid_types, + 'reagent_name': vessel_data.get('reagent_name', '') + }) + + print(f"RESET_HANDLING: 可用容器列表:") + for container in available_containers: + print(f" - {container['id']}: {container['name']}") + print(f" 液体: {container['liquids']}") + print(f" 试剂: {container['reagent_name']}") + + raise ValueError(f"找不到溶剂 '{solvent}' 对应的容器。尝试了: {possible_names}") + + +def generate_reset_handling_protocol( + G: nx.DiGraph, + solvent: str, + **kwargs # 接收其他可能的参数但不使用 +) -> List[Dict[str, Any]]: + """ + 生成重置处理协议序列 + + Args: + G: 有向图,节点为容器和设备 + solvent: 溶剂名称(从XDL传入) + **kwargs: 其他可选参数,但不使用 + + Returns: + List[Dict[str, Any]]: 动作序列 + """ + action_sequence = [] + + # 固定参数 + target_vessel = "main_reactor" # 默认目标容器 + volume = 100.0 # 默认体积 100 mL + + print(f"RESET_HANDLING: 开始生成重置处理协议") + print(f" - 溶剂: {solvent}") + print(f" - 目标容器: {target_vessel}") + print(f" - 体积: {volume} mL") + + # 1. 验证目标容器存在 + if target_vessel not in G.nodes(): + raise ValueError(f"目标容器 '{target_vessel}' 不存在于系统中") + + # 2. 查找溶剂容器 + try: + solvent_vessel = find_solvent_vessel(G, solvent) + print(f"RESET_HANDLING: 找到溶剂容器: {solvent_vessel}") + except ValueError as e: + raise ValueError(f"无法找到溶剂 '{solvent}': {str(e)}") + + # 3. 验证路径存在 + try: + path = nx.shortest_path(G, source=solvent_vessel, target=target_vessel) + print(f"RESET_HANDLING: 找到路径: {' → '.join(path)}") + except nx.NetworkXNoPath: + raise ValueError(f"从溶剂容器 '{solvent_vessel}' 到目标容器 '{target_vessel}' 没有可用路径") + + # 4. 使用pump_protocol转移溶剂 + print(f"RESET_HANDLING: 开始转移溶剂 {volume} mL") + + try: + pump_actions = generate_pump_protocol_with_rinsing( + G=G, + from_vessel=solvent_vessel, + to_vessel=target_vessel, + volume=volume, + amount="", + time=0.0, + viscous=False, + rinsing_solvent="", # 重置处理不需要清洗 + rinsing_volume=0.0, + rinsing_repeats=0, + solid=False, + flowrate=2.5, # 正常流速 + transfer_flowrate=0.5 # 正常转移流速 + ) + + action_sequence.extend(pump_actions) + + except Exception as e: + raise ValueError(f"生成泵协议时出错: {str(e)}") + + # 5. 等待溶剂稳定 + action_sequence.append({ + "action_name": "wait", + "action_kwargs": { + "time": 10.0, + "description": f"等待溶剂 {solvent} 稳定" + } + }) + + print(f"RESET_HANDLING: 协议生成完成,共 {len(action_sequence)} 个动作") + print(f"RESET_HANDLING: 已添加 {volume} mL {solvent} 到 {target_vessel}") + + return action_sequence + + +# 测试函数 +def test_reset_handling_protocol(): + """测试重置处理协议""" + print("=== RESET HANDLING PROTOCOL 测试 ===") + print("测试完成") + + +if __name__ == "__main__": + test_reset_handling_protocol() \ No newline at end of file diff --git a/unilabos/messages/__init__.py b/unilabos/messages/__init__.py index 16947e6..47f21f1 100644 --- a/unilabos/messages/__init__.py +++ b/unilabos/messages/__init__.py @@ -178,6 +178,31 @@ class WashSolidProtocol(BaseModel): time: float = 0.0 repeats: int = 1 +class AdjustPHProtocol(BaseModel): + vessel: str = Field(..., description="目标容器") + ph_value: float = Field(..., description="目标pH值") # 改为 ph_value + reagent: str = Field(..., description="酸碱试剂名称") + # 移除其他可选参数,使用默认值 + +class ResetHandlingProtocol(BaseModel): + solvent: str = Field(..., description="溶剂名称") + +class DryProtocol(BaseModel): + compound: str = Field(..., description="化合物名称") + vessel: str = Field(..., description="目标容器") + +class RecrystallizeProtocol(BaseModel): + ratio: str = Field(..., description="溶剂比例(如 '1:1', '3:7')") + solvent1: str = Field(..., description="第一种溶剂名称") + solvent2: str = Field(..., description="第二种溶剂名称") + vessel: str = Field(..., description="目标容器") + volume: float = Field(..., description="总体积 (mL)") + +class HydrogenateProtocol(BaseModel): + temp: str = Field(..., description="反应温度(如 '45 °C')") + time: str = Field(..., description="反应时间(如 '2 h')") + vessel: str = Field(..., description="反应容器") + __all__ = [ "Point3D", "PumpTransferProtocol", "CleanProtocol", "SeparateProtocol", "EvaporateProtocol", "EvacuateAndRefillProtocol", "AGVTransferProtocol", @@ -185,6 +210,8 @@ __all__ = [ "HeatChillProtocol", "HeatChillStartProtocol", "HeatChillStopProtocol", "StirProtocol", "StartStirProtocol", "StopStirProtocol", "TransferProtocol", "CleanVesselProtocol", "DissolveProtocol", - "FilterThroughProtocol", "RunColumnProtocol", "WashSolidProtocol" + "FilterThroughProtocol", "RunColumnProtocol", "WashSolidProtocol", + "AdjustPHProtocol", "ResetHandlingProtocol", "DryProtocol", + "RecrystallizeProtocol", "HydrogenateProtocol" ] # End Protocols diff --git a/unilabos_msgs/CMakeLists.txt b/unilabos_msgs/CMakeLists.txt index db64116..7e8a146 100644 --- a/unilabos_msgs/CMakeLists.txt +++ b/unilabos_msgs/CMakeLists.txt @@ -1,4 +1,4 @@ -cmake_minimum_required(VERSION 3.5) +cmake_minimum_required(VERSION 3.16) project(unilabos_msgs) # Default to C99 @@ -28,7 +28,11 @@ set(action_files "action/HeatChill.action" "action/HeatChillStart.action" "action/HeatChillStop.action" - + "action/AdjustPH.action" + "action/ResetHandling.action" + "action/Dry.action" + "action/Hydrogenate.action" + "action/Recrystallize.action" "action/CleanVessel.action" "action/Dissolve.action" "action/FilterThrough.action" @@ -39,7 +43,6 @@ set(action_files "action/Add.action" "action/Centrifuge.action" "action/Crystallize.action" - "action/Dry.action" "action/Purge.action" "action/StartPurge.action" "action/StartStir.action" diff --git a/unilabos_msgs/action/AdjustPH.action b/unilabos_msgs/action/AdjustPH.action new file mode 100644 index 0000000..815ccf7 --- /dev/null +++ b/unilabos_msgs/action/AdjustPH.action @@ -0,0 +1,13 @@ +# Request - 与您的 AdjustPHProtocol 类匹配 +string vessel +float64 ph_value +string reagent +--- +# Result - 标准结果格式 +bool success +string message +string return_info +--- +# Feedback - 标准反馈格式 +string status +float64 progress \ No newline at end of file diff --git a/unilabos_msgs/action/Dry.action b/unilabos_msgs/action/Dry.action index 5692ef2..ec10c9a 100644 --- a/unilabos_msgs/action/Dry.action +++ b/unilabos_msgs/action/Dry.action @@ -1,17 +1,12 @@ -# Goal - 干燥操作的目标参数 -string vessel # 干燥容器 -float64 time # 干燥时间 (可选,秒) -float64 pressure # 压力 (可选,Pa) -float64 temp # 温度 (可选,摄氏度) -bool continue_heatchill # 是否继续加热冷却 +# Request +string compound # 化合物 +string vessel # 干燥容器 --- -# Result - 操作结果 +# Result bool success # 操作是否成功 string message # 结果消息 string return_info --- -# Feedback - 实时反馈 -float64 progress # 进度百分比 (0-100) -float64 current_temp # 当前温度 -float64 current_pressure # 当前压力 -string current_status # 当前状态描述 \ No newline at end of file +# Feedback +string status # 当前状态描述 +float64 progress # 进度百分比 (0-100) \ No newline at end of file diff --git a/unilabos_msgs/action/Hydrogenate.action b/unilabos_msgs/action/Hydrogenate.action new file mode 100644 index 0000000..72f9459 --- /dev/null +++ b/unilabos_msgs/action/Hydrogenate.action @@ -0,0 +1,13 @@ +# Request +string temp +string time +string vessel +--- +# Result +bool success +string message +string return_info +--- +# Feedback +string status +float64 progress \ No newline at end of file diff --git a/unilabos_msgs/action/Recrystallize.action b/unilabos_msgs/action/Recrystallize.action new file mode 100644 index 0000000..fe727e8 --- /dev/null +++ b/unilabos_msgs/action/Recrystallize.action @@ -0,0 +1,15 @@ +# Request +string ratio +string solvent1 +string solvent2 +string vessel +float64 volume +--- +# Result +bool success +string message +string return_info +--- +# Feedback +string status +float64 progress \ No newline at end of file diff --git a/unilabos_msgs/action/ResetHandling.action b/unilabos_msgs/action/ResetHandling.action new file mode 100644 index 0000000..d07f240 --- /dev/null +++ b/unilabos_msgs/action/ResetHandling.action @@ -0,0 +1,11 @@ +# Request +string solvent +--- +# Result +bool success +string message +string return_info +--- +# Feedback +string status +float64 progress \ No newline at end of file