From 23eb1139a98ab8b1e621e296b8ab47ad40b87a98 Mon Sep 17 00:00:00 2001 From: KCFeng425 <2100011801@stu.pku.edu.cn> Date: Thu, 10 Jul 2025 18:25:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E8=A1=A5=E4=BA=86=E4=B8=80=E4=BA=9B?= =?UTF-8?q?=E5=8D=95=E4=BD=8D=E5=A4=84=E7=90=86=EF=BC=8Cbump=20version=20t?= =?UTF-8?q?o=200.9.11?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- README_zh.md | 2 +- recipes/ros-humble-unilabos-msgs/recipe.yaml | 2 +- recipes/unilabos/recipe.yaml | 2 +- setup.py | 2 +- .../comprehensive_protocol/checklist.md | 10 +- unilabos/compile/evaporate_protocol.py | 122 +++++++-- unilabos/compile/heatchill_protocol.py | 172 ++++++++---- unilabos/compile/recrystallize_protocol.py | 121 +++++++-- unilabos/compile/stir_protocol.py | 154 +++++++++-- unilabos/compile/utils/unit_parser.py | 206 +++++++++++++++ unilabos/compile/wash_solid_protocol.py | 247 +++++++++++++++--- unilabos/devices/virtual/virtual_heatchill.py | 67 ++++- unilabos/devices/virtual/virtual_rotavap.py | 2 +- unilabos/messages/__init__.py | 2 +- unilabos/registry/devices/virtual_device.yaml | 4 +- unilabos_msgs/action/Evaporate.action | 12 +- unilabos_msgs/action/HeatChill.action | 2 +- unilabos_msgs/action/Recrystallize.action | 10 +- unilabos_msgs/action/Stir.action | 4 +- unilabos_msgs/action/WashSolid.action | 4 +- 21 files changed, 962 insertions(+), 187 deletions(-) create mode 100644 unilabos/compile/utils/unit_parser.py diff --git a/README.md b/README.md index 5d14705..918e6fa 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ conda env update --file unilabos-[YOUR_OS].yml -n environment_name # Currently, you need to install the `unilabos_msgs` package # You can download the system-specific package from the Release page -conda install ros-humble-unilabos-msgs-10-xxxxx.tar.bz2 +conda install ros-humble-unilabos-msgs-0.9.10-xxxxx.tar.bz2 # Install PyLabRobot and other prerequisites git clone https://github.com/PyLabRobot/pylabrobot plr_repo diff --git a/README_zh.md b/README_zh.md index 5bd588b..4163853 100644 --- a/README_zh.md +++ b/README_zh.md @@ -49,7 +49,7 @@ conda env update --file unilabos-[YOUR_OS].yml -n 环境名 # 现阶段,需要安装 `unilabos_msgs` 包 # 可以前往 Release 页面下载系统对应的包进行安装 -conda install ros-humble-unilabos-msgs-0.9.10-xxxxx.tar.bz2 +conda install ros-humble-unilabos-msgs-0.9.11-xxxxx.tar.bz2 # 安装PyLabRobot等前置 git clone https://github.com/PyLabRobot/pylabrobot plr_repo diff --git a/recipes/ros-humble-unilabos-msgs/recipe.yaml b/recipes/ros-humble-unilabos-msgs/recipe.yaml index 0445115..5807be8 100644 --- a/recipes/ros-humble-unilabos-msgs/recipe.yaml +++ b/recipes/ros-humble-unilabos-msgs/recipe.yaml @@ -1,6 +1,6 @@ package: name: ros-humble-unilabos-msgs - version: 0.9.10 + version: 0.9.11 source: path: ../../unilabos_msgs folder: ros-humble-unilabos-msgs/src/work diff --git a/recipes/unilabos/recipe.yaml b/recipes/unilabos/recipe.yaml index a576855..2f72c32 100644 --- a/recipes/unilabos/recipe.yaml +++ b/recipes/unilabos/recipe.yaml @@ -1,6 +1,6 @@ package: name: unilabos - version: "0.9.10" + version: "0.9.11" source: path: ../.. diff --git a/setup.py b/setup.py index 80163ea..8a7eebd 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ package_name = 'unilabos' setup( name=package_name, - version='0.9.10', + version='0.9.11', packages=find_packages(), include_package_data=True, install_requires=['setuptools'], diff --git a/test/experiments/comprehensive_protocol/checklist.md b/test/experiments/comprehensive_protocol/checklist.md index 5dee23a..67b4c3b 100644 --- a/test/experiments/comprehensive_protocol/checklist.md +++ b/test/experiments/comprehensive_protocol/checklist.md @@ -247,4 +247,12 @@ class HydrogenateProtocol(BaseModel): 还差 - \ No newline at end of file + + + +单位修复: + evaporate + heatchill + recrysitallize + stir + wash solid \ No newline at end of file diff --git a/unilabos/compile/evaporate_protocol.py b/unilabos/compile/evaporate_protocol.py index e3ffb86..4eedb3c 100644 --- a/unilabos/compile/evaporate_protocol.py +++ b/unilabos/compile/evaporate_protocol.py @@ -1,6 +1,7 @@ -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any, Optional, Union import networkx as nx import logging +import re logger = logging.getLogger(__name__) @@ -9,6 +10,63 @@ def debug_print(message): print(f"[EVAPORATE] {message}", flush=True) logger.info(f"[EVAPORATE] {message}") +def parse_time_input(time_input: Union[str, float]) -> float: + """ + 解析时间输入,支持带单位的字符串 + + Args: + time_input: 时间输入(如 "3 min", "180", "0.5 h" 等) + + Returns: + float: 时间(秒) + """ + if isinstance(time_input, (int, float)): + return float(time_input) + + if not time_input or not str(time_input).strip(): + return 180.0 # 默认3分钟 + + time_str = str(time_input).lower().strip() + debug_print(f"解析时间输入: '{time_str}'") + + # 处理未知时间 + if time_str in ['?', 'unknown', 'tbd']: + default_time = 180.0 # 默认3分钟 + debug_print(f"检测到未知时间,使用默认值: {default_time}s") + return default_time + + # 移除空格并提取数字和单位 + time_clean = re.sub(r'\s+', '', time_str) + + # 匹配数字和单位的正则表达式 + match = re.match(r'([0-9]*\.?[0-9]+)\s*(s|sec|second|min|minute|h|hr|hour|d|day)?', time_clean) + + if not match: + # 如果无法解析,尝试直接转换为数字(默认秒) + try: + value = float(time_str) + debug_print(f"时间解析: {time_str} → {value}s(无单位,默认秒)") + return value + except ValueError: + debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值180s") + return 180.0 + + value = float(match.group(1)) + unit = match.group(2) or 's' # 默认单位为秒 + + # 转换为秒 + if unit in ['min', 'minute']: + time_sec = value * 60.0 # min -> s + elif unit in ['h', 'hr', 'hour']: + time_sec = value * 3600.0 # h -> s + elif unit in ['d', 'day']: + time_sec = value * 86400.0 # d -> s + else: # s, sec, second 或默认 + time_sec = value # 已经是s + + debug_print(f"时间转换: {value}{unit} → {time_sec}s") + return time_sec + def find_rotavap_device(G: nx.DiGraph, vessel: str = None) -> Optional[str]: """ 在组态图中查找旋转蒸发仪设备 @@ -111,20 +169,20 @@ def generate_evaporate_protocol( vessel: str, pressure: float = 0.1, temp: float = 60.0, - time: float = 180.0, + time: Union[str, float] = "180", # 🔧 修改:支持字符串时间 stir_speed: float = 100.0, solvent: str = "", - **kwargs # 接受任意额外参数,增强兼容性 + **kwargs ) -> List[Dict[str, Any]]: """ - 生成蒸发操作的协议序列 + 生成蒸发操作的协议序列 - 支持单位 Args: G: 设备图 vessel: 容器名称或旋转蒸发仪名称 pressure: 真空度 (bar),默认0.1 temp: 加热温度 (°C),默认60 - time: 蒸发时间 (秒),默认180 + time: 蒸发时间(支持 "3 min", "180", "0.5 h" 等) stir_speed: 旋转速度 (RPM),默认100 solvent: 溶剂名称(用于参数优化) **kwargs: 其他参数(兼容性) @@ -134,12 +192,12 @@ def generate_evaporate_protocol( """ debug_print("=" * 50) - debug_print("开始生成蒸发协议") + debug_print("开始生成蒸发协议(支持单位)") debug_print(f"输入参数:") debug_print(f" - vessel: {vessel}") debug_print(f" - pressure: {pressure} bar") debug_print(f" - temp: {temp}°C") - debug_print(f" - time: {time}s ({time/60:.1f}分钟)") + debug_print(f" - time: {time} (类型: {type(time)})") debug_print(f" - stir_speed: {stir_speed} RPM") debug_print(f" - solvent: '{solvent}'") debug_print("=" * 50) @@ -177,8 +235,15 @@ def generate_evaporate_protocol( debug_print(f"容器 '{vessel}' 不存在或类型不正确,使用旋转蒸发仪设备: {rotavap_device}") target_vessel = rotavap_device - # === 步骤3: 参数验证和修正 === - debug_print("步骤3: 参数验证和修正...") + # === 🔧 新增:步骤3:单位解析处理 === + debug_print("步骤3: 单位解析处理...") + + # 解析时间 + final_time = parse_time_input(time) + debug_print(f"时间解析: {time} → {final_time}s ({final_time/60:.1f}分钟)") + + # === 步骤4: 参数验证和修正 === + debug_print("步骤4: 参数验证和修正...") # 修正参数范围 if pressure <= 0 or pressure > 1.0: @@ -189,9 +254,9 @@ def generate_evaporate_protocol( debug_print(f"温度 {temp}°C 超出范围,修正为 60°C") temp = 60.0 - if time <= 0: - debug_print(f"时间 {time}s 无效,修正为 1800s") - time = 1800.0 + if final_time <= 0: + debug_print(f"时间 {final_time}s 无效,修正为 180s") + final_time = 180.0 if stir_speed < 10.0 or stir_speed > 300.0: debug_print(f"旋转速度 {stir_speed} RPM 超出范围,修正为 100 RPM") @@ -215,10 +280,10 @@ def generate_evaporate_protocol( pressure = min(pressure, 0.01) debug_print("高沸点溶剂:提高温度,降低真空度") - debug_print(f"最终参数: pressure={pressure}, temp={temp}, time={time}, stir_speed={stir_speed}") + debug_print(f"最终参数: pressure={pressure}, temp={temp}, time={final_time}, stir_speed={stir_speed}") - # === 步骤4: 生成动作序列 === - debug_print("步骤4: 生成动作序列...") + # === 步骤5: 生成动作序列 === + debug_print("步骤5: 生成动作序列...") action_sequence = [] @@ -237,7 +302,7 @@ def generate_evaporate_protocol( "vessel": target_vessel, "pressure": pressure, "temp": temp, - "time": time, + "time": final_time, "stir_speed": stir_speed, "solvent": solvent } @@ -256,7 +321,7 @@ def generate_evaporate_protocol( debug_print(f"总动作数: {len(action_sequence)}") debug_print(f"旋转蒸发仪: {rotavap_device}") debug_print(f"目标容器: {target_vessel}") - debug_print(f"蒸发参数: {pressure} bar, {temp}°C, {time}s, {stir_speed} RPM") + debug_print(f"蒸发参数: {pressure} bar, {temp}°C, {final_time}s, {stir_speed} RPM") debug_print("=" * 50) return action_sequence @@ -273,7 +338,7 @@ def generate_quick_evaporate_protocol( G, vessel, pressure=0.2, temp=40.0, - time=900.0, + time="15 min", # 🔧 使用带单位的时间 stir_speed=80.0, **kwargs ) @@ -288,7 +353,7 @@ def generate_gentle_evaporate_protocol( G, vessel, pressure=0.1, temp=50.0, - time=2700.0, + time="45 min", # 🔧 使用带单位的时间 stir_speed=60.0, **kwargs ) @@ -303,7 +368,7 @@ def generate_high_vacuum_evaporate_protocol( G, vessel, pressure=0.01, temp=35.0, - time=3600.0, + time="1 h", # 🔧 使用带单位的时间 stir_speed=120.0, **kwargs ) @@ -318,7 +383,22 @@ def generate_standard_evaporate_protocol( G, vessel, pressure=0.1, temp=60.0, - time=1800.0, + time="3 min", # 🔧 使用带单位的时间 stir_speed=100.0, **kwargs ) + +# 测试函数 +def test_time_parsing(): + """测试时间解析功能""" + print("=== EVAPORATE 时间解析测试 ===") + + test_times = ["3 min", "180", "0.5 h", "2 hours", "?", "unknown", "1.5", "30 s"] + for time_str in test_times: + result = parse_time_input(time_str) + print(f"时间解析: '{time_str}' → {result}s ({result/60:.1f}分钟)") + + print("✅ 测试完成") + +if __name__ == "__main__": + test_time_parsing() diff --git a/unilabos/compile/heatchill_protocol.py b/unilabos/compile/heatchill_protocol.py index 3e7f551..297015e 100644 --- a/unilabos/compile/heatchill_protocol.py +++ b/unilabos/compile/heatchill_protocol.py @@ -1,4 +1,4 @@ -from typing import List, Dict, Any +from typing import List, Dict, Any, Union import networkx as nx import logging import re @@ -10,6 +10,97 @@ def debug_print(message): print(f"[HEATCHILL] {message}", flush=True) logger.info(f"[HEATCHILL] {message}") +def parse_time_with_units(time_input: Union[str, float, int], default_unit: str = "s") -> float: + """ + 解析带单位的时间输入 + + Args: + time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) + default_unit: 默认单位(默认为秒) + + Returns: + float: 时间(秒) + """ + if not time_input: + return 0.0 + + # 处理数值输入 + if isinstance(time_input, (int, float)): + result = float(time_input) + debug_print(f"数值时间输入: {time_input} → {result}s(默认单位)") + return result + + # 处理字符串输入 + time_str = str(time_input).lower().strip() + debug_print(f"解析时间字符串: '{time_str}'") + + # 处理特殊值 + if time_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_time = 300.0 # 5分钟默认值 + debug_print(f"检测到未知时间,使用默认值: {default_time}s") + return default_time + + # 如果是纯数字,使用默认单位 + try: + value = float(time_str) + if default_unit == "s": + result = value + elif default_unit in ["min", "minute"]: + result = value * 60.0 + elif default_unit in ["h", "hour"]: + result = value * 3600.0 + else: + result = value # 默认秒 + debug_print(f"纯数字输入: {time_str} → {result}s(单位: {default_unit})") + return result + except ValueError: + pass + + # 使用正则表达式匹配数字和单位 + pattern = r'(\d+\.?\d*)\s*([a-z]*)' + match = re.match(pattern, time_str) + + if not match: + debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 60s") + return 60.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit + + # 单位转换映射 + unit_multipliers = { + # 秒 + 's': 1.0, + 'sec': 1.0, + 'second': 1.0, + 'seconds': 1.0, + + # 分钟 + 'm': 60.0, + 'min': 60.0, + 'mins': 60.0, + 'minute': 60.0, + 'minutes': 60.0, + + # 小时 + 'h': 3600.0, + 'hr': 3600.0, + 'hrs': 3600.0, + 'hour': 3600.0, + 'hours': 3600.0, + + # 天 + 'd': 86400.0, + 'day': 86400.0, + 'days': 86400.0, + } + + multiplier = unit_multipliers.get(unit, 1.0) + result = value * multiplier + + debug_print(f"时间解析: '{time_str}' → {value} {unit} → {result}s") + return result + def parse_temp_spec(temp_spec: str) -> float: """解析温度规格为具体温度""" if not temp_spec: @@ -117,7 +208,7 @@ def generate_heat_chill_protocol( G: nx.DiGraph, vessel: str, temp: float = 25.0, - time: float = 300.0, + time: Union[str, float] = "300", # 🔧 修改:支持字符串时间 temp_spec: str = "", time_spec: str = "", pressure: str = "", @@ -125,35 +216,18 @@ def generate_heat_chill_protocol( stir: bool = False, stir_speed: float = 300.0, purpose: str = "", - **kwargs # 🔧 接受额外参数,增强兼容性 + **kwargs ) -> List[Dict[str, Any]]: """ - 生成加热/冷却操作的协议序列 - - Args: - G: 设备图 - vessel: 加热容器名称(必需) - temp: 目标温度 (°C) - time: 加热时间 (秒) - temp_spec: 温度规格(如 'room temperature', 'reflux') - time_spec: 时间规格(如 'overnight', '2 h') - pressure: 压力规格(如 '1 mbar'),不做特殊处理 - reflux_solvent: 回流溶剂名称,不做特殊处理 - stir: 是否搅拌 - stir_speed: 搅拌速度 (RPM) - purpose: 操作目的 - **kwargs: 其他参数(兼容性) - - Returns: - List[Dict[str, Any]]: 加热操作的动作序列 + 生成加热/冷却操作的协议序列 - 支持单位 """ debug_print("=" * 50) - debug_print("开始生成加热冷却协议") + debug_print("开始生成加热冷却协议(支持单位)") debug_print(f"输入参数:") debug_print(f" - vessel: {vessel}") debug_print(f" - temp: {temp}°C") - debug_print(f" - time: {time}s ({time/60:.1f}分钟)") + debug_print(f" - time: {time} (类型: {type(time)})") debug_print(f" - temp_spec: {temp_spec}") debug_print(f" - time_spec: {time_spec}") debug_print(f" - pressure: {pressure}") @@ -176,6 +250,9 @@ def generate_heat_chill_protocol( if vessel not in G.nodes(): raise ValueError(f"容器 '{vessel}' 不存在于系统中") + # === 🔧 新增:单位解析处理 === + debug_print("步骤2: 单位解析处理...") + # 温度解析:优先使用 temp_spec,然后是 temp final_temp = temp if temp_spec: @@ -183,10 +260,12 @@ def generate_heat_chill_protocol( debug_print(f"温度解析: '{temp_spec}' → {final_temp}°C") # 时间解析:优先使用 time_spec,然后是 time - final_time = time if time_spec: - final_time = parse_time_spec(time_spec) - debug_print(f"时间解析: '{time_spec}' → {final_time}s ({final_time/60:.1f}分钟)") + final_time = parse_time_spec(time_spec) # 使用现有的time_spec解析 + debug_print(f"时间解析: '{time_spec}' → {final_time}s") + else: + final_time = parse_time_with_units(time, "s") + debug_print(f"时间解析: {time} → {final_time}s ({final_time/60:.1f}分钟)") # 参数范围验证 if final_temp < -50.0 or final_temp > 300.0: @@ -201,10 +280,10 @@ def generate_heat_chill_protocol( debug_print(f"搅拌速度 {stir_speed} RPM 超出范围,修正为 300 RPM") stir_speed = 300.0 - debug_print(f"✅ 参数验证通过") + debug_print(f"✅ 单位解析和参数验证通过") # === 查找加热设备 === - debug_print("步骤2: 查找加热设备...") + debug_print("步骤3: 查找加热设备...") try: heatchill_id = find_connected_heatchill(G, vessel) @@ -215,18 +294,18 @@ def generate_heat_chill_protocol( raise ValueError(f"无法找到加热设备: {str(e)}") # === 执行加热操作 === - debug_print("步骤3: 执行加热操作...") + debug_print("步骤4: 执行加热操作...") heatchill_action = { "device_id": heatchill_id, "action_name": "heat_chill", "action_kwargs": { "vessel": vessel, - "temp": final_temp, - "time": final_time, - "stir": stir, - "stir_speed": stir_speed, - "purpose": purpose or f"加热到 {final_temp}°C" + "temp": float(final_temp), # 🔧 确保是浮点数 + "time": float(final_time), # 🔧 确保是浮点数 + "stir": bool(stir), # 🔧 确保是布尔值 + "stir_speed": float(stir_speed), # 🔧 确保是浮点数 + "purpose": str(purpose or f"加热到 {final_temp}°C") # 🔧 确保是字符串 } } @@ -234,7 +313,7 @@ def generate_heat_chill_protocol( # === 总结 === debug_print("=" * 50) - debug_print(f"加热冷却协议生成完成") + debug_print(f"加热冷却协议生成完成(支持单位)") debug_print(f"总动作数: {len(action_sequence)}") debug_print(f"加热容器: {vessel}") debug_print(f"目标温度: {final_temp}°C") @@ -247,12 +326,11 @@ def generate_heat_chill_protocol( return action_sequence - def generate_heat_chill_to_temp_protocol( G: nx.DiGraph, vessel: str, temp: float = 25.0, - time: float = 300.0, + time: Union[str, float] = 300.0, # 🔧 也支持字符串 temp_spec: str = "", time_spec: str = "", pressure: str = "", @@ -269,7 +347,7 @@ def generate_heat_chill_to_temp_protocol( G: 设备图 vessel: 加热容器名称(必需) temp: 目标温度 (°C) - time: 加热时间 (秒) + time: 加热时间(支持字符串和数字) temp_spec: 温度规格(如 'room temperature', 'reflux') time_spec: 时间规格(如 'overnight', '2 h') pressure: 压力规格(如 '1 mbar'),不做特殊处理 @@ -288,7 +366,7 @@ def generate_heat_chill_to_temp_protocol( debug_print(f"输入参数:") debug_print(f" - vessel: {vessel}") debug_print(f" - temp: {temp}°C") - debug_print(f" - time: {time}s ({time / 60:.1f}分钟)") + debug_print(f" - time: {time} (类型: {type(time)})") debug_print(f" - temp_spec: {temp_spec}") debug_print(f" - time_spec: {time_spec}") debug_print(f" - pressure: {pressure}") @@ -317,11 +395,13 @@ def generate_heat_chill_to_temp_protocol( final_temp = parse_temp_spec(temp_spec) debug_print(f"温度解析: '{temp_spec}' → {final_temp}°C") - # 时间解析:优先使用 time_spec,然后是 time - final_time = time + # 🔧 修复:时间解析,支持字符串输入 if time_spec: final_time = parse_time_spec(time_spec) debug_print(f"时间解析: '{time_spec}' → {final_time}s ({final_time / 60:.1f}分钟)") + else: + final_time = parse_time_with_units(time, "s") + debug_print(f"时间解析: {time} → {final_time}s ({final_time/60:.1f}分钟)") # 参数范围验证 if final_temp < -50.0 or final_temp > 300.0: @@ -357,11 +437,11 @@ def generate_heat_chill_to_temp_protocol( "action_name": "heat_chill", "action_kwargs": { "vessel": vessel, - "temp": final_temp, - "time": final_time, - "stir": stir, - "stir_speed": stir_speed, - "purpose": purpose or f"加热到 {final_temp}°C" + "temp": float(final_temp), # 🔧 确保是浮点数 + "time": float(final_time), # 🔧 确保是浮点数 + "stir": bool(stir), # 🔧 确保是布尔值 + "stir_speed": float(stir_speed), # 🔧 确保是浮点数 + "purpose": str(purpose or f"加热到 {final_temp}°C") # 🔧 确保是字符串 } } diff --git a/unilabos/compile/recrystallize_protocol.py b/unilabos/compile/recrystallize_protocol.py index b69d88b..4e5f592 100644 --- a/unilabos/compile/recrystallize_protocol.py +++ b/unilabos/compile/recrystallize_protocol.py @@ -1,8 +1,80 @@ import networkx as nx -from typing import List, Dict, Any, Tuple +import re +from typing import List, Dict, Any, Tuple, Union from .pump_protocol import generate_pump_protocol_with_rinsing +def parse_volume_with_units(volume_input: Union[str, float, int], default_unit: str = "mL") -> float: + """ + 解析带单位的体积输入 + + Args: + volume_input: 体积输入(如 "100 mL", "2.5 L", "500", "?", 100.0) + default_unit: 默认单位(默认为毫升) + + Returns: + float: 体积(毫升) + """ + if not volume_input: + return 0.0 + + # 处理数值输入 + if isinstance(volume_input, (int, float)): + result = float(volume_input) + print(f"RECRYSTALLIZE: 数值体积输入: {volume_input} → {result}mL(默认单位)") + return result + + # 处理字符串输入 + volume_str = str(volume_input).lower().strip() + print(f"RECRYSTALLIZE: 解析体积字符串: '{volume_str}'") + + # 处理特殊值 + if volume_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_volume = 50.0 # 50mL默认值 + print(f"RECRYSTALLIZE: 检测到未知体积,使用默认值: {default_volume}mL") + return default_volume + + # 如果是纯数字,使用默认单位 + try: + value = float(volume_str) + if default_unit.lower() in ["ml", "milliliter"]: + result = value + elif default_unit.lower() in ["l", "liter"]: + result = value * 1000.0 + elif default_unit.lower() in ["μl", "ul", "microliter"]: + result = value / 1000.0 + else: + result = value # 默认mL + print(f"RECRYSTALLIZE: 纯数字输入: {volume_str} → {result}mL(单位: {default_unit})") + return result + except ValueError: + pass + + # 移除空格并提取数字和单位 + volume_clean = re.sub(r'\s+', '', volume_str) + + # 匹配数字和单位的正则表达式 + match = re.match(r'([0-9]*\.?[0-9]+)\s*(ml|l|μl|ul|microliter|milliliter|liter)?', volume_clean) + + if not match: + print(f"RECRYSTALLIZE: ⚠️ 无法解析体积: '{volume_str}',使用默认值: 50mL") + return 50.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit.lower() + + # 转换为毫升 + if unit in ['l', 'liter']: + volume = value * 1000.0 # L -> mL + elif unit in ['μl', 'ul', 'microliter']: + volume = value / 1000.0 # μL -> mL + else: # ml, milliliter 或默认 + volume = value # 已经是mL + + print(f"RECRYSTALLIZE: 体积解析: '{volume_str}' → {value} {unit} → {volume}mL") + return volume + + def parse_ratio(ratio_str: str) -> Tuple[float, float]: """ 解析比例字符串,支持多种格式 @@ -111,11 +183,11 @@ def generate_recrystallize_protocol( solvent1: str, solvent2: str, vessel: str, - volume: float, - **kwargs # 接收其他可能的参数但不使用 + volume: Union[str, float], # 🔧 修改:支持字符串和数值 + **kwargs ) -> List[Dict[str, Any]]: """ - 生成重结晶协议序列 + 生成重结晶协议序列 - 支持单位 Args: G: 有向图,节点为容器和设备 @@ -123,38 +195,42 @@ def generate_recrystallize_protocol( solvent1: 第一种溶剂名称 solvent2: 第二种溶剂名称 vessel: 目标容器 - volume: 总体积 (mL) - **kwargs: 其他可选参数,但不使用 + volume: 总体积(支持 "100 mL", "50", "2.5 L" 等) + **kwargs: 其他可选参数 Returns: List[Dict[str, Any]]: 动作序列 """ action_sequence = [] - print(f"RECRYSTALLIZE: 开始生成重结晶协议") + print(f"RECRYSTALLIZE: 开始生成重结晶协议(支持单位)") print(f" - 比例: {ratio}") print(f" - 溶剂1: {solvent1}") print(f" - 溶剂2: {solvent2}") print(f" - 容器: {vessel}") - print(f" - 总体积: {volume} mL") + print(f" - 总体积: {volume} (类型: {type(volume)})") # 1. 验证目标容器存在 if vessel not in G.nodes(): raise ValueError(f"目标容器 '{vessel}' 不存在于系统中") - # 2. 解析比例 + # 2. 🔧 新增:解析体积(支持单位) + final_volume = parse_volume_with_units(volume, "mL") + print(f"RECRYSTALLIZE: 解析体积: {volume} → {final_volume}mL") + + # 3. 解析比例 ratio1, ratio2 = parse_ratio(ratio) total_ratio = ratio1 + ratio2 - # 3. 计算各溶剂体积 - volume1 = volume * (ratio1 / total_ratio) - volume2 = volume * (ratio2 / total_ratio) + # 4. 计算各溶剂体积 + volume1 = final_volume * (ratio1 / total_ratio) + volume2 = final_volume * (ratio2 / total_ratio) print(f"RECRYSTALLIZE: 解析比例: {ratio1}:{ratio2}") print(f"RECRYSTALLIZE: {solvent1} 体积: {volume1:.2f} mL") print(f"RECRYSTALLIZE: {solvent2} 体积: {volume2:.2f} mL") - # 4. 查找溶剂容器 + # 5. 查找溶剂容器 try: solvent1_vessel = find_solvent_vessel(G, solvent1) print(f"RECRYSTALLIZE: 找到溶剂1容器: {solvent1_vessel}") @@ -167,7 +243,7 @@ def generate_recrystallize_protocol( except ValueError as e: raise ValueError(f"无法找到溶剂2 '{solvent2}': {str(e)}") - # 5. 验证路径存在 + # 6. 验证路径存在 try: path1 = nx.shortest_path(G, source=solvent1_vessel, target=vessel) print(f"RECRYSTALLIZE: 溶剂1路径: {' → '.join(path1)}") @@ -180,7 +256,7 @@ def generate_recrystallize_protocol( except nx.NetworkXNoPath: raise ValueError(f"从溶剂2容器 '{solvent2_vessel}' 到目标容器 '{vessel}' 没有可用路径") - # 6. 添加第一种溶剂 + # 7. 添加第一种溶剂 print(f"RECRYSTALLIZE: 开始添加溶剂1 {volume1:.2f} mL") try: @@ -188,7 +264,7 @@ def generate_recrystallize_protocol( G=G, from_vessel=solvent1_vessel, to_vessel=vessel, - volume=volume1, + volume=volume1, # 使用解析后的体积 amount="", time=0.0, viscous=False, @@ -205,7 +281,7 @@ def generate_recrystallize_protocol( except Exception as e: raise ValueError(f"生成溶剂1泵协议时出错: {str(e)}") - # 7. 等待溶剂1稳定 + # 8. 等待溶剂1稳定 action_sequence.append({ "action_name": "wait", "action_kwargs": { @@ -214,7 +290,7 @@ def generate_recrystallize_protocol( } }) - # 8. 添加第二种溶剂 + # 9. 添加第二种溶剂 print(f"RECRYSTALLIZE: 开始添加溶剂2 {volume2:.2f} mL") try: @@ -222,7 +298,7 @@ def generate_recrystallize_protocol( G=G, from_vessel=solvent2_vessel, to_vessel=vessel, - volume=volume2, + volume=volume2, # 使用解析后的体积 amount="", time=0.0, viscous=False, @@ -239,7 +315,7 @@ def generate_recrystallize_protocol( except Exception as e: raise ValueError(f"生成溶剂2泵协议时出错: {str(e)}") - # 9. 等待溶剂2稳定 + # 10. 等待溶剂2稳定 action_sequence.append({ "action_name": "wait", "action_kwargs": { @@ -248,17 +324,18 @@ def generate_recrystallize_protocol( } }) - # 10. 等待重结晶完成 + # 11. 等待重结晶完成 action_sequence.append({ "action_name": "wait", "action_kwargs": { "time": 600.0, # 等待10分钟进行重结晶 - "description": f"等待重结晶完成({solvent1}:{solvent2} = {ratio})" + "description": f"等待重结晶完成({solvent1}:{solvent2} = {ratio},总体积 {final_volume}mL)" } }) print(f"RECRYSTALLIZE: 协议生成完成,共 {len(action_sequence)} 个动作") print(f"RECRYSTALLIZE: 预计总时间: {620/60:.1f} 分钟") + print(f"RECRYSTALLIZE: 总体积: {final_volume}mL") return action_sequence diff --git a/unilabos/compile/stir_protocol.py b/unilabos/compile/stir_protocol.py index a67155d..5343a2a 100644 --- a/unilabos/compile/stir_protocol.py +++ b/unilabos/compile/stir_protocol.py @@ -10,6 +10,97 @@ def debug_print(message): print(f"[STIR] {message}", flush=True) logger.info(f"[STIR] {message}") +def parse_time_with_units(time_input: Union[str, float, int], default_unit: str = "s") -> float: + """ + 解析带单位的时间输入 + + Args: + time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) + default_unit: 默认单位(默认为秒) + + Returns: + float: 时间(秒) + """ + if not time_input: + return 0.0 + + # 处理数值输入 + if isinstance(time_input, (int, float)): + result = float(time_input) + debug_print(f"数值时间输入: {time_input} → {result}s(默认单位)") + return result + + # 处理字符串输入 + time_str = str(time_input).lower().strip() + debug_print(f"解析时间字符串: '{time_str}'") + + # 处理特殊值 + if time_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_time = 300.0 # 5分钟默认值 + debug_print(f"检测到未知时间,使用默认值: {default_time}s") + return default_time + + # 如果是纯数字,使用默认单位 + try: + value = float(time_str) + if default_unit == "s": + result = value + elif default_unit in ["min", "minute"]: + result = value * 60.0 + elif default_unit in ["h", "hour"]: + result = value * 3600.0 + else: + result = value # 默认秒 + debug_print(f"纯数字输入: {time_str} → {result}s(单位: {default_unit})") + return result + except ValueError: + pass + + # 使用正则表达式匹配数字和单位 + pattern = r'(\d+\.?\d*)\s*([a-z]*)' + match = re.match(pattern, time_str) + + if not match: + debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 60s") + return 60.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit + + # 单位转换映射 + unit_multipliers = { + # 秒 + 's': 1.0, + 'sec': 1.0, + 'second': 1.0, + 'seconds': 1.0, + + # 分钟 + 'm': 60.0, + 'min': 60.0, + 'mins': 60.0, + 'minute': 60.0, + 'minutes': 60.0, + + # 小时 + 'h': 3600.0, + 'hr': 3600.0, + 'hrs': 3600.0, + 'hour': 3600.0, + 'hours': 3600.0, + + # 天 + 'd': 86400.0, + 'day': 86400.0, + 'days': 86400.0, + } + + multiplier = unit_multipliers.get(unit, 1.0) + result = value * multiplier + + debug_print(f"时间解析: '{time_str}' → {value} {unit} → {result}s") + return result + def parse_time_spec(time_spec: str) -> float: """ 解析时间规格字符串为秒数 @@ -211,27 +302,26 @@ def find_connected_stirrer(G: nx.DiGraph, vessel: str = None) -> str: def generate_stir_protocol( G: nx.DiGraph, vessel: str, - time: Union[str, float, int] = 300.0, - stir_time: Union[str, float, int] = 0.0, + time: Union[str, float, int] = "300", # 🔧 修改:默认为字符串 + stir_time: Union[str, float, int] = "0", # 🔧 修改:支持字符串 time_spec: str = "", event: str = "", stir_speed: float = 200.0, - settling_time: float = 60.0, + settling_time: Union[str, float] = "60", # 🔧 修改:支持字符串 **kwargs ) -> List[Dict[str, Any]]: """ - 生成搅拌操作的协议序列 - 定时搅拌 + 沉降 - 支持 time 和 stir_time 参数统一处理 + 生成搅拌操作的协议序列 - 支持单位 Args: G: 设备图 vessel: 搅拌容器名称(必需) - time: 搅拌时间(支持多种格式) - stir_time: 搅拌时间(与time等效) + time: 搅拌时间(支持 "5 min", "300", "0.5 h" 等) + stir_time: 搅拌时间(与time等效,支持单位) time_spec: 时间规格(优先级最高) event: 事件标识 stir_speed: 搅拌速度 (RPM),默认200 RPM - settling_time: 沉降时间 (秒),默认60s + settling_time: 沉降时间(支持单位,默认60秒) **kwargs: 其他参数(兼容性) Returns: @@ -239,7 +329,7 @@ def generate_stir_protocol( """ debug_print("=" * 50) - debug_print("开始生成搅拌协议") + debug_print("开始生成搅拌协议(支持单位)") debug_print(f"输入参数:") debug_print(f" - vessel: {vessel}") debug_print(f" - time: {time}") @@ -265,19 +355,29 @@ def generate_stir_protocol( debug_print(f"✅ 参数验证通过") - # === 时间处理(统一 time 和 stir_time)=== - debug_print("步骤2: 时间处理...") + # === 🔧 新增:单位解析处理 === + debug_print("步骤2: 单位解析处理...") - # 确定实际使用的时间值 - actual_time_input = stir_time if stir_time else time + # 确定实际使用的时间值(stir_time优先) + actual_time_input = stir_time if stir_time not in ["0", 0, 0.0] else time - # 解析时间 - parsed_time = parse_time_input(actual_time_input, time_spec) + # 解析时间(time_spec > actual_time_input) + if time_spec: + parsed_time = parse_time_spec(time_spec) # 使用现有的time_spec解析 + debug_print(f"使用time_spec: '{time_spec}' → {parsed_time}s") + else: + parsed_time = parse_time_with_units(actual_time_input, "s") + debug_print(f"解析时间: {actual_time_input} → {parsed_time}s") + + # 解析沉降时间 + parsed_settling_time = parse_time_with_units(settling_time, "s") + debug_print(f"解析沉降时间: {settling_time} → {parsed_settling_time}s") debug_print(f"时间解析结果:") debug_print(f" - 原始输入: time={time}, stir_time={stir_time}") debug_print(f" - 时间规格: {time_spec}") - debug_print(f" - 最终时间: {parsed_time}秒 ({parsed_time/60:.1f}分钟)") + debug_print(f" - 最终搅拌时间: {parsed_time}s ({parsed_time/60:.1f}分钟)") + debug_print(f" - 最终沉降时间: {parsed_settling_time}s ({parsed_settling_time/60:.1f}分钟)") # 修正参数范围 if parsed_time < 0: @@ -294,12 +394,12 @@ def generate_stir_protocol( debug_print(f"搅拌速度 {stir_speed} RPM 过高,修正为 1000 RPM") stir_speed = 1000.0 - if settling_time < 0: - debug_print(f"沉降时间 {settling_time}s 无效,修正为 60s") - settling_time = 60.0 - elif settling_time > 1800: - debug_print(f"沉降时间 {settling_time}s 过长,修正为 600s") - settling_time = 600.0 + if parsed_settling_time < 0: + debug_print(f"沉降时间 {parsed_settling_time}s 无效,修正为 60s") + parsed_settling_time = 60.0 + elif parsed_settling_time > 1800: + debug_print(f"沉降时间 {parsed_settling_time}s 过长,修正为 600s") + parsed_settling_time = 600.0 # === 查找搅拌设备 === debug_print("步骤3: 查找搅拌设备...") @@ -318,12 +418,12 @@ def generate_stir_protocol( # 构建搅拌动作参数 stir_kwargs = { "vessel": vessel, - "time": str(time), # 保持原始字符串格式 + "time": str(time), # 保持原始字符串格式 "event": event, "time_spec": time_spec, - "stir_time": parsed_time, # 解析后的时间(秒) + "stir_time": parsed_time, # 解析后的时间(秒) "stir_speed": stir_speed, - "settling_time": settling_time + "settling_time": parsed_settling_time # 解析后的沉降时间(秒) } debug_print(f"搅拌参数: {stir_kwargs}") @@ -338,10 +438,10 @@ def generate_stir_protocol( # === 总结 === debug_print("=" * 50) - debug_print(f"搅拌协议生成完成") + debug_print(f"搅拌协议生成完成(支持单位)") debug_print(f"总动作数: {len(action_sequence)}") debug_print(f"搅拌容器: {vessel}") - debug_print(f"搅拌参数: {stir_speed} RPM, {parsed_time}s, 沉降 {settling_time}s") + debug_print(f"搅拌参数: {stir_speed} RPM, {parsed_time}s, 沉降 {parsed_settling_time}s") debug_print("=" * 50) return action_sequence diff --git a/unilabos/compile/utils/unit_parser.py b/unilabos/compile/utils/unit_parser.py new file mode 100644 index 0000000..d1d297c --- /dev/null +++ b/unilabos/compile/utils/unit_parser.py @@ -0,0 +1,206 @@ +""" +统一的单位解析工具模块 +支持时间、体积、质量等各种单位的解析 +""" + +import re +import logging +from typing import Union + +logger = logging.getLogger(__name__) + +def debug_print(message, prefix="[UNIT_PARSER]"): + """调试输出""" + print(f"{prefix} {message}", flush=True) + logger.info(f"{prefix} {message}") + +def parse_time_with_units(time_input: Union[str, float, int], default_unit: str = "s") -> float: + """ + 解析带单位的时间输入 + + Args: + time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) + default_unit: 默认单位(默认为秒) + + Returns: + float: 时间(秒) + """ + if not time_input: + return 0.0 + + # 处理数值输入 + if isinstance(time_input, (int, float)): + result = float(time_input) + debug_print(f"数值时间输入: {time_input} → {result}s(默认单位)") + return result + + # 处理字符串输入 + time_str = str(time_input).lower().strip() + debug_print(f"解析时间字符串: '{time_str}'") + + # 处理特殊值 + if time_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_time = 300.0 # 5分钟默认值 + debug_print(f"检测到未知时间,使用默认值: {default_time}s") + return default_time + + # 如果是纯数字,使用默认单位 + try: + value = float(time_str) + if default_unit == "s": + result = value + elif default_unit in ["min", "minute"]: + result = value * 60.0 + elif default_unit in ["h", "hour"]: + result = value * 3600.0 + else: + result = value # 默认秒 + debug_print(f"纯数字输入: {time_str} → {result}s(单位: {default_unit})") + return result + except ValueError: + pass + + # 使用正则表达式匹配数字和单位 + pattern = r'(\d+\.?\d*)\s*([a-z]*)' + match = re.match(pattern, time_str) + + if not match: + debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 60s") + return 60.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit + + # 单位转换映射 + unit_multipliers = { + # 秒 + 's': 1.0, + 'sec': 1.0, + 'second': 1.0, + 'seconds': 1.0, + + # 分钟 + 'm': 60.0, + 'min': 60.0, + 'mins': 60.0, + 'minute': 60.0, + 'minutes': 60.0, + + # 小时 + 'h': 3600.0, + 'hr': 3600.0, + 'hrs': 3600.0, + 'hour': 3600.0, + 'hours': 3600.0, + + # 天 + 'd': 86400.0, + 'day': 86400.0, + 'days': 86400.0, + } + + multiplier = unit_multipliers.get(unit, 1.0) + result = value * multiplier + + debug_print(f"时间解析: '{time_str}' → {value} {unit} → {result}s") + return result + +def parse_volume_with_units(volume_input: Union[str, float, int], default_unit: str = "mL") -> float: + """ + 解析带单位的体积输入 + + Args: + volume_input: 体积输入(如 "100 mL", "2.5 L", "500", "?", 100.0) + default_unit: 默认单位(默认为毫升) + + Returns: + float: 体积(毫升) + """ + if not volume_input: + return 0.0 + + # 处理数值输入 + if isinstance(volume_input, (int, float)): + result = float(volume_input) + debug_print(f"数值体积输入: {volume_input} → {result}mL(默认单位)") + return result + + # 处理字符串输入 + volume_str = str(volume_input).lower().strip() + debug_print(f"解析体积字符串: '{volume_str}'") + + # 处理特殊值 + if volume_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_volume = 50.0 # 50mL默认值 + debug_print(f"检测到未知体积,使用默认值: {default_volume}mL") + return default_volume + + # 如果是纯数字,使用默认单位 + try: + value = float(volume_str) + if default_unit.lower() in ["ml", "milliliter"]: + result = value + elif default_unit.lower() in ["l", "liter"]: + result = value * 1000.0 + elif default_unit.lower() in ["μl", "ul", "microliter"]: + result = value / 1000.0 + else: + result = value # 默认mL + debug_print(f"纯数字输入: {volume_str} → {result}mL(单位: {default_unit})") + return result + except ValueError: + pass + + # 移除空格并提取数字和单位 + volume_clean = re.sub(r'\s+', '', volume_str) + + # 匹配数字和单位的正则表达式 + match = re.match(r'([0-9]*\.?[0-9]+)\s*(ml|l|μl|ul|microliter|milliliter|liter)?', volume_clean) + + if not match: + debug_print(f"⚠️ 无法解析体积: '{volume_str}',使用默认值: 50mL") + return 50.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit.lower() + + # 转换为毫升 + if unit in ['l', 'liter']: + volume = value * 1000.0 # L -> mL + elif unit in ['μl', 'ul', 'microliter']: + volume = value / 1000.0 # μL -> mL + else: # ml, milliliter 或默认 + volume = value # 已经是mL + + debug_print(f"体积解析: '{volume_str}' → {value} {unit} → {volume}mL") + return volume + +# 测试函数 +def test_unit_parser(): + """测试单位解析功能""" + print("=== 单位解析器测试 ===") + + # 测试时间解析 + time_tests = [ + "30 min", "1 h", "300", "5.5 h", "?", 60.0, "2 hours", "30 s" + ] + + print("\n时间解析测试:") + for time_input in time_tests: + result = parse_time_with_units(time_input) + print(f" {time_input} → {result}s ({result/60:.1f}min)") + + # 测试体积解析 + volume_tests = [ + "100 mL", "2.5 L", "500", "?", 100.0, "500 μL", "1 liter" + ] + + print("\n体积解析测试:") + for volume_input in volume_tests: + result = parse_volume_with_units(volume_input) + print(f" {volume_input} → {result}mL") + + print("\n✅ 测试完成") + +if __name__ == "__main__": + test_unit_parser() \ No newline at end of file diff --git a/unilabos/compile/wash_solid_protocol.py b/unilabos/compile/wash_solid_protocol.py index 1a5c5e3..8d43108 100644 --- a/unilabos/compile/wash_solid_protocol.py +++ b/unilabos/compile/wash_solid_protocol.py @@ -10,6 +10,167 @@ def debug_print(message): print(f"[WASH_SOLID] {message}", flush=True) logger.info(f"[WASH_SOLID] {message}") +def parse_time_with_units(time_input: Union[str, float, int], default_unit: str = "s") -> float: + """ + 解析带单位的时间输入 + + Args: + time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) + default_unit: 默认单位(默认为秒) + + Returns: + float: 时间(秒) + """ + if not time_input: + return 0.0 + + # 处理数值输入 + if isinstance(time_input, (int, float)): + result = float(time_input) + debug_print(f"数值时间输入: {time_input} → {result}s(默认单位)") + return result + + # 处理字符串输入 + time_str = str(time_input).lower().strip() + debug_print(f"解析时间字符串: '{time_str}'") + + # 处理特殊值 + if time_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_time = 300.0 # 5分钟默认值 + debug_print(f"检测到未知时间,使用默认值: {default_time}s") + return default_time + + # 如果是纯数字,使用默认单位 + try: + value = float(time_str) + if default_unit == "s": + result = value + elif default_unit in ["min", "minute"]: + result = value * 60.0 + elif default_unit in ["h", "hour"]: + result = value * 3600.0 + else: + result = value # 默认秒 + debug_print(f"纯数字输入: {time_str} → {result}s(单位: {default_unit})") + return result + except ValueError: + pass + + # 使用正则表达式匹配数字和单位 + pattern = r'(\d+\.?\d*)\s*([a-z]*)' + match = re.match(pattern, time_str) + + if not match: + debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 60s") + return 60.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit + + # 单位转换映射 + unit_multipliers = { + # 秒 + 's': 1.0, + 'sec': 1.0, + 'second': 1.0, + 'seconds': 1.0, + + # 分钟 + 'm': 60.0, + 'min': 60.0, + 'mins': 60.0, + 'minute': 60.0, + 'minutes': 60.0, + + # 小时 + 'h': 3600.0, + 'hr': 3600.0, + 'hrs': 3600.0, + 'hour': 3600.0, + 'hours': 3600.0, + + # 天 + 'd': 86400.0, + 'day': 86400.0, + 'days': 86400.0, + } + + multiplier = unit_multipliers.get(unit, 1.0) + result = value * multiplier + + debug_print(f"时间解析: '{time_str}' → {value} {unit} → {result}s") + return result + +def parse_volume_with_units(volume_input: Union[str, float, int], default_unit: str = "mL") -> float: + """ + 解析带单位的体积输入 + + Args: + volume_input: 体积输入(如 "100 mL", "2.5 L", "500", "?", 100.0) + default_unit: 默认单位(默认为毫升) + + Returns: + float: 体积(毫升) + """ + if not volume_input: + return 0.0 + + # 处理数值输入 + if isinstance(volume_input, (int, float)): + result = float(volume_input) + debug_print(f"数值体积输入: {volume_input} → {result}mL(默认单位)") + return result + + # 处理字符串输入 + volume_str = str(volume_input).lower().strip() + debug_print(f"解析体积字符串: '{volume_str}'") + + # 处理特殊值 + if volume_str in ['?', 'unknown', 'tbd', 'to be determined']: + default_volume = 50.0 # 50mL默认值 + debug_print(f"检测到未知体积,使用默认值: {default_volume}mL") + return default_volume + + # 如果是纯数字,使用默认单位 + try: + value = float(volume_str) + if default_unit.lower() in ["ml", "milliliter"]: + result = value + elif default_unit.lower() in ["l", "liter"]: + result = value * 1000.0 + elif default_unit.lower() in ["μl", "ul", "microliter"]: + result = value / 1000.0 + else: + result = value # 默认mL + debug_print(f"纯数字输入: {volume_str} → {result}mL(单位: {default_unit})") + return result + except ValueError: + pass + + # 移除空格并提取数字和单位 + volume_clean = re.sub(r'\s+', '', volume_str) + + # 匹配数字和单位的正则表达式 + match = re.match(r'([0-9]*\.?[0-9]+)\s*(ml|l|μl|ul|microliter|milliliter|liter)?', volume_clean) + + if not match: + debug_print(f"⚠️ 无法解析体积: '{volume_str}',使用默认值: 50mL") + return 50.0 + + value = float(match.group(1)) + unit = match.group(2) or default_unit.lower() + + # 转换为毫升 + if unit in ['l', 'liter']: + volume = value * 1000.0 # L -> mL + elif unit in ['μl', 'ul', 'microliter']: + volume = value / 1000.0 # μL -> mL + else: # ml, milliliter 或默认 + volume = value # 已经是mL + + debug_print(f"体积解析: '{volume_str}' → {value} {unit} → {volume}mL") + return volume + def parse_volume_spec(volume_spec: str) -> float: """ 解析体积规格字符串为毫升数 @@ -357,46 +518,46 @@ def generate_wash_solid_protocol( G: nx.DiGraph, vessel: str, solvent: str, - volume: Union[float, str] = 0.0, # 🔧 修改:支持字符串输入 + volume: Union[float, str] = "50", # 🔧 修改:默认为字符串 filtrate_vessel: str = "", temp: float = 25.0, stir: bool = False, stir_speed: float = 0.0, - time: float = 0.0, + time: Union[str, float] = "0", # 🔧 修改:支持字符串时间 repeats: int = 1, - # === 新增参数 === - volume_spec: str = "", # 体积规格 - repeats_spec: str = "", # 重复次数规格 - mass: str = "", # 🔧 新增:固体质量(用于转换体积) - event: str = "", # 事件标识符 + # === 现有参数保持不变 === + volume_spec: str = "", + repeats_spec: str = "", + mass: str = "", + event: str = "", **kwargs ) -> List[Dict[str, Any]]: """ - 生成固体清洗操作的协议序列 - 增强版 + 生成固体清洗操作的协议序列 - 增强版(支持单位) - 支持多种体积输入方式: - 1. volume_spec: "small volume", "large volume" 等 - 2. mass: "10 g", "2.5 kg", "500 mg" 等(转换为体积) - 3. volume: 数值或字符串 "10", "10 mL", "2.5 L" 等 + 支持多种输入方式: + 1. volume: "100 mL", "50", "2.5 L", "?" + 2. time: "5 min", "300", "0.5 h", "?" + 3. volume_spec: "small volume", "large volume" 等 + 4. mass: "10 g", "2.5 kg", "500 mg" 等(转换为体积) """ debug_print("=" * 60) - debug_print("开始生成固体清洗协议") + debug_print("开始生成固体清洗协议(支持单位)") debug_print(f"输入参数:") debug_print(f" - vessel: {vessel}") debug_print(f" - solvent: {solvent}") debug_print(f" - volume: {volume} (类型: {type(volume)})") + debug_print(f" - time: {time} (类型: {type(time)})") debug_print(f" - volume_spec: '{volume_spec}'") - debug_print(f" - mass: '{mass}'") # 🔧 新增日志 + debug_print(f" - mass: '{mass}'") debug_print(f" - filtrate_vessel: '{filtrate_vessel}'") debug_print(f" - temp: {temp}°C") debug_print(f" - stir: {stir}") debug_print(f" - stir_speed: {stir_speed} RPM") - debug_print(f" - time: {time}s") debug_print(f" - repeats: {repeats}") debug_print(f" - repeats_spec: '{repeats_spec}'") debug_print(f" - event: '{event}'") - debug_print(f" - 其他参数: {kwargs}") debug_print("=" * 60) action_sequence = [] @@ -416,12 +577,27 @@ def generate_wash_solid_protocol( debug_print(f"✅ 必需参数验证通过") - # === 参数处理 === - debug_print("步骤2: 参数处理...") + # === 🔧 新增:单位解析处理 === + debug_print("步骤2: 单位解析处理...") - # 🔧 修改:处理体积参数(支持mass转换和字符串解析) - final_volume = parse_volume_input(volume, volume_spec, mass) - debug_print(f"最终体积: {final_volume}mL") + # 解析体积(优先级:volume_spec > mass > volume) + if volume_spec and volume_spec.strip(): + final_volume = parse_volume_spec(volume_spec) + debug_print(f"使用volume_spec: {final_volume}mL") + elif mass and mass.strip(): + final_volume = parse_mass_to_volume(mass) + if final_volume > 0: + debug_print(f"使用mass转换: {final_volume}mL") + else: + final_volume = parse_volume_with_units(volume, "mL") + debug_print(f"mass转换失败,使用volume: {final_volume}mL") + else: + final_volume = parse_volume_with_units(volume, "mL") + debug_print(f"使用volume: {final_volume}mL") + + # 解析时间 + final_time = parse_time_with_units(time, "s") + debug_print(f"解析时间: {time} → {final_time}s ({final_time/60:.1f}min)") # 处理重复次数参数(repeats_spec优先) final_repeats = parse_repeats_input(repeats, repeats_spec) @@ -436,9 +612,9 @@ def generate_wash_solid_protocol( debug_print(f"搅拌速度 {stir_speed} RPM 超出范围,修正为 200 RPM") stir_speed = 200.0 if stir else 0.0 - if time < 0: - debug_print(f"时间 {time}s 无效,修正为 0") - time = 0.0 + if final_time < 0: + debug_print(f"时间 {final_time}s 无效,修正为 0") + final_time = 0.0 if final_repeats < 1: debug_print(f"重复次数 {final_repeats} 无效,修正为 1") @@ -447,9 +623,9 @@ def generate_wash_solid_protocol( debug_print(f"重复次数 {final_repeats} 过多,修正为 10") final_repeats = 10 - debug_print(f"✅ 参数处理完成") + debug_print(f"✅ 单位解析和参数处理完成") - # === 查找设备 === + # === 查找设备(保持原有逻辑)=== debug_print("步骤3: 查找设备...") try: @@ -507,17 +683,14 @@ def generate_wash_solid_protocol( debug_print(f"❌ 设备查找失败: {str(e)}") raise ValueError(f"设备查找失败: {str(e)}") - # === 执行清洗循环 === + # === 执行清洗循环(保持原有逻辑,使用解析后的参数)=== debug_print("步骤4: 执行清洗循环...") for cycle in range(final_repeats): debug_print(f"=== 第 {cycle+1}/{final_repeats} 次清洗 ===") - # 🔧 修复:分解为基础动作序列 - # 1. 加入清洗溶剂 debug_print(f" 步骤 {cycle+1}.1: 加入清洗溶剂") - # 🔧 修复:使用 pump protocol 而不是直接调用 transfer action try: from .pump_protocol import generate_pump_protocol_with_rinsing @@ -525,7 +698,7 @@ def generate_wash_solid_protocol( G=G, from_vessel=solvent_source, to_vessel=vessel, - volume=final_volume, + volume=final_volume, # 使用解析后的体积 amount="", time=0.0, viscous=False, @@ -548,22 +721,21 @@ def generate_wash_solid_protocol( except Exception as e: debug_print(f"❌ 转移失败: {str(e)}") - # 继续执行,可能有其他问题 # 2. 搅拌混合(如果需要) if stir and stirrer_device: debug_print(f" 步骤 {cycle+1}.2: 搅拌混合") - stir_time = max(time, 30.0) if time > 0 else 60.0 # 默认搅拌1分钟 + stir_time = max(final_time, 30.0) if final_time > 0 else 60.0 # 使用解析后的时间 stir_action = { "device_id": stirrer_device, "action_name": "stir", "action_kwargs": { "vessel": vessel, - "time": str(int(stir_time)), # 转换为字符串格式 + "time": str(time), # 保持原始字符串格式 "event": event, "time_spec": "", - "stir_time": stir_time, + "stir_time": stir_time, # 解析后的时间(秒) "stir_speed": stir_speed, "settling_time": 30.0 } @@ -582,7 +754,7 @@ def generate_wash_solid_protocol( "stir_speed": 0.0, "temp": temp, "continue_heatchill": False, - "volume": final_volume + "volume": final_volume # 使用解析后的体积 } } action_sequence.append(filter_action) @@ -596,11 +768,12 @@ def generate_wash_solid_protocol( # === 总结 === debug_print("=" * 60) - debug_print(f"固体清洗协议生成完成") + debug_print(f"固体清洗协议生成完成(支持单位)") debug_print(f"总动作数: {len(action_sequence)}") debug_print(f"清洗容器: {vessel}") debug_print(f"使用溶剂: {solvent}") debug_print(f"清洗体积: {final_volume}mL") + debug_print(f"清洗时间: {final_time}s ({final_time/60:.1f}min)") debug_print(f"重复次数: {final_repeats}") debug_print(f"滤液收集: {actual_filtrate_vessel}") debug_print(f"事件标识: {event}") diff --git a/unilabos/devices/virtual/virtual_heatchill.py b/unilabos/devices/virtual/virtual_heatchill.py index 541434a..20c9a19 100644 --- a/unilabos/devices/virtual/virtual_heatchill.py +++ b/unilabos/devices/virtual/virtual_heatchill.py @@ -58,12 +58,30 @@ class VirtualHeatChill: }) return True - async def heat_chill(self, vessel: str, temp: float, time: float, stir: bool, + async def heat_chill(self, vessel: str, temp: float, time, stir: bool, stir_speed: float, purpose: str) -> bool: - """Execute heat chill action - 按实际时间运行,实时更新剩余时间""" - self.logger.info(f"HeatChill: vessel={vessel}, temp={temp}°C, time={time}s, stir={stir}, stir_speed={stir_speed}") + """Execute heat chill action - 🔧 修复:确保参数类型正确""" - # 验证参数 + # 🔧 关键修复:确保所有参数类型正确 + try: + temp = float(temp) + time_value = float(time) # 强制转换为浮点数 + stir_speed = float(stir_speed) + stir = bool(stir) + vessel = str(vessel) + purpose = str(purpose) + except (ValueError, TypeError) as e: + error_msg = f"参数类型转换错误: temp={temp}({type(temp)}), time={time}({type(time)}), error={str(e)}" + self.logger.error(error_msg) + self.data.update({ + "status": f"Error: {error_msg}", + "operation_mode": "Error" + }) + return False + + self.logger.info(f"HeatChill: vessel={vessel}, temp={temp}°C, time={time_value}s, stir={stir}, stir_speed={stir_speed}") + + # 验证参数范围 if temp > self._max_temp or temp < self._min_temp: error_msg = f"温度 {temp}°C 超出范围 ({self._min_temp}°C - {self._max_temp}°C)" self.logger.error(error_msg) @@ -82,6 +100,15 @@ class VirtualHeatChill: }) return False + if time_value <= 0: + error_msg = f"时间 {time_value}s 必须大于0" + self.logger.error(error_msg) + self.data.update({ + "status": f"Error: {error_msg}", + "operation_mode": "Error" + }) + return False + # 确定操作模式 if temp > 25.0: operation_mode = "Heating" @@ -93,9 +120,9 @@ class VirtualHeatChill: operation_mode = "Maintaining" status_action = "保温" - # **修复**: 使用重命名的time模块 + # 🔧 修复:使用转换后的时间值 start_time = time_module.time() - total_time = time + total_time = time_value # 使用转换后的浮点数 # 开始操作 stir_info = f" | 搅拌: {stir_speed} RPM" if stir else "" @@ -107,9 +134,9 @@ class VirtualHeatChill: "remaining_time": total_time, }) - # **修复**: 在等待过程中每秒更新剩余时间 + # 在等待过程中每秒更新剩余时间 while True: - current_time = time_module.time() # 使用重命名的time模块 + current_time = time_module.time() elapsed = current_time - start_time remaining = max(0, total_time - elapsed) @@ -141,6 +168,21 @@ class VirtualHeatChill: async def heat_chill_start(self, vessel: str, temp: float, purpose: str) -> bool: """Start continuous heat chill""" + + # 🔧 添加类型转换 + try: + temp = float(temp) + vessel = str(vessel) + purpose = str(purpose) + except (ValueError, TypeError) as e: + error_msg = f"参数类型转换错误: {str(e)}" + self.logger.error(error_msg) + self.data.update({ + "status": f"Error: {error_msg}", + "operation_mode": "Error" + }) + return False + self.logger.info(f"HeatChillStart: vessel={vessel}, temp={temp}°C") # 验证参数 @@ -176,6 +218,15 @@ class VirtualHeatChill: async def heat_chill_stop(self, vessel: str) -> bool: """Stop heat chill""" + + # 🔧 添加类型转换 + try: + vessel = str(vessel) + except (ValueError, TypeError) as e: + error_msg = f"参数类型转换错误: {str(e)}" + self.logger.error(error_msg) + return False + self.logger.info(f"HeatChillStop: vessel={vessel}") self.data.update({ diff --git a/unilabos/devices/virtual/virtual_rotavap.py b/unilabos/devices/virtual/virtual_rotavap.py index 9f576b5..bfd6494 100644 --- a/unilabos/devices/virtual/virtual_rotavap.py +++ b/unilabos/devices/virtual/virtual_rotavap.py @@ -73,7 +73,7 @@ class VirtualRotavap: vessel: str, pressure: float = 0.1, temp: float = 60.0, - time: float = 1800.0, + time: float = 180.0, stir_speed: float = 100.0, solvent: str = "", **kwargs diff --git a/unilabos/messages/__init__.py b/unilabos/messages/__init__.py index 52fd97f..f91f382 100644 --- a/unilabos/messages/__init__.py +++ b/unilabos/messages/__init__.py @@ -125,7 +125,7 @@ class EvaporateProtocol(BaseModel): # === 所有其他参数都改为可选,添加默认值 === pressure: float = Field(0.1, description="真空度 (bar),默认0.1 bar") temp: float = Field(60.0, description="加热温度 (°C),默认60°C") - time: float = Field(1800.0, description="蒸发时间 (秒),默认1800s (30分钟)") + time: float = Field(180.0, description="蒸发时间 (秒),默认1800s (30分钟)") stir_speed: float = Field(100.0, description="旋转速度 (RPM),默认100 RPM") # === 新版XDL兼容参数(可选) === diff --git a/unilabos/registry/devices/virtual_device.yaml b/unilabos/registry/devices/virtual_device.yaml index 3a98e56..d165649 100644 --- a/unilabos/registry/devices/virtual_device.yaml +++ b/unilabos/registry/devices/virtual_device.yaml @@ -2321,7 +2321,7 @@ virtual_rotavap: pressure: 0.1 stir_speed: 100.0 temp: 60.0 - time: 1800.0 + time: 180.0 vessel: null handles: [] result: {} @@ -2341,7 +2341,7 @@ virtual_rotavap: default: 60.0 type: number time: - default: 1800.0 + default: 180.0 type: number vessel: type: string diff --git a/unilabos_msgs/action/Evaporate.action b/unilabos_msgs/action/Evaporate.action index b66e32f..9cecb62 100644 --- a/unilabos_msgs/action/Evaporate.action +++ b/unilabos_msgs/action/Evaporate.action @@ -1,10 +1,10 @@ # Organic Synthesis Station Evaporate Action -string vessel -float64 pressure -float64 temp -float64 time -float64 stir_speed -string solvent +string vessel # 目标容器 +float64 pressure # 真空度 +float64 temp # 温度 +string time # 🔧 蒸发时间(支持带单位,如"3 min","180",默认秒) +float64 stir_speed # 旋转速度 +string solvent # 溶剂名称 --- string return_info bool success diff --git a/unilabos_msgs/action/HeatChill.action b/unilabos_msgs/action/HeatChill.action index a39510e..1e7025e 100644 --- a/unilabos_msgs/action/HeatChill.action +++ b/unilabos_msgs/action/HeatChill.action @@ -1,7 +1,7 @@ # Goal - 加热冷却操作的目标参数 string vessel # 加热容器名称(必需) float64 temp # 目标温度(可选,默认25.0) -float64 time # 加热时间(可选,默认300.0) +string time # 🔧 加热时间(支持带单位,如"5 min","300",默认秒) string temp_spec # 温度规格(可选) string time_spec # 时间规格(可选) string pressure # 压力规格(可选,不做特殊处理) diff --git a/unilabos_msgs/action/Recrystallize.action b/unilabos_msgs/action/Recrystallize.action index fe727e8..2ae42bf 100644 --- a/unilabos_msgs/action/Recrystallize.action +++ b/unilabos_msgs/action/Recrystallize.action @@ -1,9 +1,9 @@ # Request -string ratio -string solvent1 -string solvent2 -string vessel -float64 volume +string ratio # 溶剂比例(如"1:1","3:7") +string solvent1 # 第一种溶剂 +string solvent2 # 第二种溶剂 +string vessel # 目标容器 +string volume # 🔧 总体积(支持带单位,如"100 mL","50",默认mL) --- # Result bool success diff --git a/unilabos_msgs/action/Stir.action b/unilabos_msgs/action/Stir.action index 64e9f04..e3e5580 100644 --- a/unilabos_msgs/action/Stir.action +++ b/unilabos_msgs/action/Stir.action @@ -1,11 +1,11 @@ # Goal - 搅拌操作的目标参数 string vessel # 搅拌容器名称(必需) -string time # 搅拌时间(如 "0.5 h", "30 min") +string time # 🔧 搅拌时间(如 "0.5 h", "30 min", "300",默认秒) string event # 事件标识(如 "A", "B") string time_spec # 时间规格(如 "several minutes") float64 stir_time # 解析后的搅拌时间(秒) float64 stir_speed # 搅拌速度(默认200.0) -float64 settling_time # 沉降时间(默认60.0) +string settling_time # 🔧 沉降时间(支持带单位,默认秒) --- # Result - 操作结果 bool success # 操作是否成功 diff --git a/unilabos_msgs/action/WashSolid.action b/unilabos_msgs/action/WashSolid.action index 8ef159d..281ca4c 100644 --- a/unilabos_msgs/action/WashSolid.action +++ b/unilabos_msgs/action/WashSolid.action @@ -1,12 +1,12 @@ # Goal - 固体清洗操作的目标参数 string vessel # 装有固体的容器名称(必需) string solvent # 清洗溶剂名称(必需) -string volume # 🔧 修改:体积(支持数字和带单位的字符串) +string volume # 🔧 体积(支持数字和带单位的字符串,如"100 mL","?") string filtrate_vessel # 滤液收集容器(可选,默认"") float64 temp # 清洗温度(可选,默认25.0) bool stir # 是否搅拌(可选,默认false) float64 stir_speed # 搅拌速度(可选,默认0.0) -float64 time # 清洗时间(可选,默认0.0) +string time # 🔧 清洗时间(支持带单位,如"5 min","300 s",默认秒) int32 repeats # 重复次数(与repeats_spec二选一) string volume_spec # 体积规格(优先级高于volume) string repeats_spec # 重复次数规格(优先级高于repeats)