diff --git a/test/experiments/comprehensive_protocol/comprehensive_station.json b/test/experiments/comprehensive_protocol/comprehensive_station.json index 43e4cc67..ed35379b 100644 --- a/test/experiments/comprehensive_protocol/comprehensive_station.json +++ b/test/experiments/comprehensive_protocol/comprehensive_station.json @@ -49,7 +49,6 @@ "config": { "protocol_type": [ "AddProtocol", - "TransferProtocol", "StartStirProtocol", "StopStirProtocol", "StirProtocol", @@ -1077,7 +1076,7 @@ "target": "solid_dispenser_1", "type": "resource", "port": { - "solid_reagent_bottle_1": "top", + "solid_reagent_bottle_1": "bottom", "solid_dispenser_1": "SolidIn" } }, @@ -1087,7 +1086,7 @@ "target": "solid_dispenser_1", "type": "resource", "port": { - "solid_reagent_bottle_2": "top", + "solid_reagent_bottle_2": "bottom", "solid_dispenser_1": "SolidIn" } }, @@ -1097,7 +1096,7 @@ "target": "solid_dispenser_1", "type": "resource", "port": { - "solid_reagent_bottle_3": "top", + "solid_reagent_bottle_3": "bottom", "solid_dispenser_1": "SolidIn" } } diff --git a/unilabos/compile/__init__.py b/unilabos/compile/__init__.py index ef6f8c69..fd848ba7 100644 --- a/unilabos/compile/__init__.py +++ b/unilabos/compile/__init__.py @@ -15,7 +15,6 @@ from .heatchill_protocol import ( generate_heat_chill_to_temp_protocol # 保留导入,但不注册为协议 ) from .stir_protocol import generate_stir_protocol, generate_start_stir_protocol, generate_stop_stir_protocol -from .transfer_protocol import generate_transfer_protocol from .clean_vessel_protocol import generate_clean_vessel_protocol from .dissolve_protocol import generate_dissolve_protocol from .filter_through_protocol import generate_filter_through_protocol @@ -54,6 +53,5 @@ action_protocol_generators = { StartStirProtocol: generate_start_stir_protocol, StirProtocol: generate_stir_protocol, StopStirProtocol: generate_stop_stir_protocol, - TransferProtocol: generate_transfer_protocol, WashSolidProtocol: generate_wash_solid_protocol, } \ No newline at end of file diff --git a/unilabos/compile/add_protocol.py b/unilabos/compile/add_protocol.py index e5f02393..c492d238 100644 --- a/unilabos/compile/add_protocol.py +++ b/unilabos/compile/add_protocol.py @@ -1,314 +1,24 @@ +from functools import partial + import networkx as nx import re import logging from typing import List, Dict, Any, Union -from .utils.vessel_parser import get_vessel + +from .utils.unit_parser import parse_volume_input, parse_mass_input, parse_time_input +from .utils.vessel_parser import get_vessel, find_solid_dispenser, find_connected_stirrer, find_reagent_vessel +from .utils.logger_util import action_log from .pump_protocol import generate_pump_protocol_with_rinsing logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"[ADD] {message}", flush=True) logger.info(f"[ADD] {message}") -def parse_volume_input(volume_input: Union[str, float]) -> float: - """ - 解析体积输入,支持带单位的字符串 - - Args: - volume_input: 体积输入(如 "2.7 mL", "2.67 mL", "?", 10.0) - - Returns: - float: 体积(毫升) - """ - if isinstance(volume_input, (int, float)): - debug_print(f"📏 体积输入为数值: {volume_input}") - return float(volume_input) - - if not volume_input or not str(volume_input).strip(): - debug_print(f"⚠️ 体积输入为空,返回0.0mL") - return 0.0 - - volume_str = str(volume_input).lower().strip() - debug_print(f"🔍 解析体积输入: '{volume_str}'") - - # 处理未知体积 - if volume_str in ['?', 'unknown', 'tbd', 'to be determined']: - default_volume = 10.0 # 默认10mL - debug_print(f"❓ 检测到未知体积,使用默认值: {default_volume}mL 🎯") - return default_volume - - # 移除空格并提取数字和单位 - volume_clean = re.sub(r'\s+', '', volume_str) - - # 匹配数字和单位的正则表达式 - match = re.match(r'([0-9]*\.?[0-9]+)\s*(ml|l|μl|ul|microliter|milliliter|liter)?', volume_clean) - - if not match: - debug_print(f"❌ 无法解析体积: '{volume_str}',使用默认值10mL") - return 10.0 - - value = float(match.group(1)) - unit = match.group(2) or 'ml' # 默认单位为毫升 - - # 转换为毫升 - if unit in ['l', 'liter']: - volume = value * 1000.0 # L -> mL - debug_print(f"🔄 体积转换: {value}L → {volume}mL") - elif unit in ['μl', 'ul', 'microliter']: - volume = value / 1000.0 # μL -> mL - debug_print(f"🔄 体积转换: {value}μL → {volume}mL") - else: # ml, milliliter 或默认 - volume = value # 已经是mL - debug_print(f"✅ 体积已为mL: {volume}mL") - - return volume - -def parse_mass_input(mass_input: Union[str, float]) -> float: - """ - 解析质量输入,支持带单位的字符串 - - Args: - mass_input: 质量输入(如 "19.3 g", "4.5 g", 2.5) - - Returns: - float: 质量(克) - """ - if isinstance(mass_input, (int, float)): - debug_print(f"⚖️ 质量输入为数值: {mass_input}g") - return float(mass_input) - - if not mass_input or not str(mass_input).strip(): - debug_print(f"⚠️ 质量输入为空,返回0.0g") - return 0.0 - - mass_str = str(mass_input).lower().strip() - debug_print(f"🔍 解析质量输入: '{mass_str}'") - - # 移除空格并提取数字和单位 - mass_clean = re.sub(r'\s+', '', mass_str) - - # 匹配数字和单位的正则表达式 - match = re.match(r'([0-9]*\.?[0-9]+)\s*(g|mg|kg|gram|milligram|kilogram)?', mass_clean) - - if not match: - debug_print(f"❌ 无法解析质量: '{mass_str}',返回0.0g") - return 0.0 - - value = float(match.group(1)) - unit = match.group(2) or 'g' # 默认单位为克 - - # 转换为克 - if unit in ['mg', 'milligram']: - mass = value / 1000.0 # mg -> g - debug_print(f"🔄 质量转换: {value}mg → {mass}g") - elif unit in ['kg', 'kilogram']: - mass = value * 1000.0 # kg -> g - debug_print(f"🔄 质量转换: {value}kg → {mass}g") - else: # g, gram 或默认 - mass = value # 已经是g - debug_print(f"✅ 质量已为g: {mass}g") - - return mass - -def parse_time_input(time_input: Union[str, float]) -> float: - """ - 解析时间输入,支持带单位的字符串 - - Args: - time_input: 时间输入(如 "1 h", "20 min", "30 s", 60.0) - - Returns: - float: 时间(秒) - """ - if isinstance(time_input, (int, float)): - debug_print(f"⏱️ 时间输入为数值: {time_input}秒") - return float(time_input) - - if not time_input or not str(time_input).strip(): - debug_print(f"⚠️ 时间输入为空,返回0秒") - return 0.0 - - time_str = str(time_input).lower().strip() - debug_print(f"🔍 解析时间输入: '{time_str}'") - - # 处理未知时间 - if time_str in ['?', 'unknown', 'tbd']: - default_time = 60.0 # 默认1分钟 - debug_print(f"❓ 检测到未知时间,使用默认值: {default_time}s (1分钟) ⏰") - return default_time - - # 移除空格并提取数字和单位 - time_clean = re.sub(r'\s+', '', time_str) - - # 匹配数字和单位的正则表达式 - match = re.match(r'([0-9]*\.?[0-9]+)\s*(s|sec|second|min|minute|h|hr|hour|d|day)?', time_clean) - - if not match: - debug_print(f"❌ 无法解析时间: '{time_str}',返回0s") - return 0.0 - - value = float(match.group(1)) - unit = match.group(2) or 's' # 默认单位为秒 - - # 转换为秒 - if unit in ['min', 'minute']: - time_sec = value * 60.0 # min -> s - debug_print(f"🔄 时间转换: {value}分钟 → {time_sec}秒") - elif unit in ['h', 'hr', 'hour']: - time_sec = value * 3600.0 # h -> s - debug_print(f"🔄 时间转换: {value}小时 → {time_sec}秒") - elif unit in ['d', 'day']: - time_sec = value * 86400.0 # d -> s - debug_print(f"🔄 时间转换: {value}天 → {time_sec}秒") - else: # s, sec, second 或默认 - time_sec = value # 已经是s - debug_print(f"✅ 时间已为秒: {time_sec}秒") - - return time_sec - -def find_reagent_vessel(G: nx.DiGraph, reagent: str) -> str: - """增强版试剂容器查找,支持固体和液体""" - debug_print(f"🔍 开始查找试剂 '{reagent}' 的容器...") - - # 🔧 方法1:直接搜索 data.reagent_name 和 config.reagent - debug_print(f"📋 方法1: 搜索reagent字段...") - for node in G.nodes(): - node_data = G.nodes[node].get('data', {}) - node_type = G.nodes[node].get('type', '') - config_data = G.nodes[node].get('config', {}) - - # 只搜索容器类型的节点 - if node_type == 'container': - reagent_name = node_data.get('reagent_name', '').lower() - config_reagent = config_data.get('reagent', '').lower() - - # 精确匹配 - if reagent_name == reagent.lower() or config_reagent == reagent.lower(): - debug_print(f"✅ 通过reagent字段精确匹配到容器: {node} 🎯") - return node - - # 模糊匹配 - if (reagent.lower() in reagent_name and reagent_name) or \ - (reagent.lower() in config_reagent and config_reagent): - debug_print(f"✅ 通过reagent字段模糊匹配到容器: {node} 🔍") - return node - - # 🔧 方法2:常见的容器命名规则 - debug_print(f"📋 方法2: 使用命名规则查找...") - reagent_clean = reagent.lower().replace(' ', '_').replace('-', '_') - possible_names = [ - reagent_clean, - f"flask_{reagent_clean}", - f"bottle_{reagent_clean}", - f"vessel_{reagent_clean}", - f"{reagent_clean}_flask", - f"{reagent_clean}_bottle", - f"reagent_{reagent_clean}", - f"reagent_bottle_{reagent_clean}", - f"solid_reagent_bottle_{reagent_clean}", - f"reagent_bottle_1", # 通用试剂瓶 - f"reagent_bottle_2", - f"reagent_bottle_3" - ] - - debug_print(f"🔍 尝试的容器名称: {possible_names[:5]}... (共{len(possible_names)}个)") - - for name in possible_names: - if name in G.nodes(): - node_type = G.nodes[name].get('type', '') - if node_type == 'container': - debug_print(f"✅ 通过命名规则找到容器: {name} 📝") - return name - - # 🔧 方法3:节点名称模糊匹配 - debug_print(f"📋 方法3: 节点名称模糊匹配...") - for node_id in G.nodes(): - node_data = G.nodes[node_id] - if node_data.get('type') == 'container': - # 检查节点名称是否包含试剂名称 - if reagent_clean in node_id.lower(): - debug_print(f"✅ 通过节点名称模糊匹配到容器: {node_id} 🔍") - return node_id - - # 检查液体类型匹配 - vessel_data = node_data.get('data', {}) - liquids = vessel_data.get('liquid', []) - for liquid in liquids: - if isinstance(liquid, dict): - liquid_type = liquid.get('liquid_type') or liquid.get('name', '') - if liquid_type.lower() == reagent.lower(): - debug_print(f"✅ 通过液体类型匹配到容器: {node_id} 💧") - return node_id - - # 🔧 方法4:使用第一个试剂瓶作为备选 - debug_print(f"📋 方法4: 查找备选试剂瓶...") - for node_id in G.nodes(): - node_data = G.nodes[node_id] - if (node_data.get('type') == 'container' and - ('reagent' in node_id.lower() or 'bottle' in node_id.lower())): - debug_print(f"⚠️ 未找到专用容器,使用备选试剂瓶: {node_id} 🔄") - return node_id - - debug_print(f"❌ 所有方法都失败了,无法找到容器!") - raise ValueError(f"找不到试剂 '{reagent}' 对应的容器") - -def find_connected_stirrer(G: nx.DiGraph, vessel: str) -> str: - """查找连接到指定容器的搅拌器""" - debug_print(f"🔍 查找连接到容器 '{vessel}' 的搅拌器...") - - stirrer_nodes = [] - for node in G.nodes(): - node_class = G.nodes[node].get('class', '').lower() - if 'stirrer' in node_class: - stirrer_nodes.append(node) - debug_print(f"📋 发现搅拌器: {node}") - - debug_print(f"📊 共找到 {len(stirrer_nodes)} 个搅拌器") - - # 查找连接到容器的搅拌器 - for stirrer in stirrer_nodes: - if G.has_edge(stirrer, vessel) or G.has_edge(vessel, stirrer): - debug_print(f"✅ 找到连接的搅拌器: {stirrer} 🔗") - return stirrer - - # 返回第一个搅拌器 - if stirrer_nodes: - debug_print(f"⚠️ 未找到直接连接的搅拌器,使用第一个: {stirrer_nodes[0]} 🔄") - return stirrer_nodes[0] - - debug_print(f"❌ 未找到任何搅拌器") - return "" - -def find_solid_dispenser(G: nx.DiGraph) -> str: - """查找固体加样器""" - debug_print(f"🔍 查找固体加样器...") - - for node in G.nodes(): - node_class = G.nodes[node].get('class', '').lower() - if 'solid_dispenser' in node_class or 'dispenser' in node_class: - debug_print(f"✅ 找到固体加样器: {node} 🥄") - return node - - debug_print(f"❌ 未找到固体加样器") - return "" # 🆕 创建进度日志动作 -def create_action_log(message: str, emoji: str = "📝") -> Dict[str, Any]: - """创建一个动作日志""" - full_message = f"{emoji} {message}" - debug_print(full_message) - logger.info(full_message) - print(f"[ACTION] {full_message}", flush=True) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": full_message - } - } +create_action_log = partial(action_log, prefix="[ADD]") def generate_add_protocol( G: nx.DiGraph, @@ -398,12 +108,7 @@ def generate_add_protocol( final_time = parse_time_input(time) debug_print(f"📊 解析结果:") - debug_print(f" 📏 体积: {final_volume}mL") - debug_print(f" ⚖️ 质量: {final_mass}g") - debug_print(f" ⏱️ 时间: {final_time}s") - debug_print(f" 🧬 摩尔: '{mol}'") - debug_print(f" 🎯 事件: '{event}'") - debug_print(f" ⚡ 速率: '{rate_spec}'") + debug_print(f" 体积: {final_volume}mL, 质量: {final_mass}g, 时间: {final_time}s, 摩尔: '{mol}', 事件: '{event}', 速率: '{rate_spec}'") # === 判断添加类型 === debug_print("🔍 步骤3: 判断添加类型...") diff --git a/unilabos/compile/adjustph_protocol.py b/unilabos/compile/adjustph_protocol.py index f6db7ba2..1207cda5 100644 --- a/unilabos/compile/adjustph_protocol.py +++ b/unilabos/compile/adjustph_protocol.py @@ -8,25 +8,8 @@ logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"[ADJUST_PH] {message}", flush=True) logger.info(f"[ADJUST_PH] {message}") -# 🆕 创建进度日志动作 -def create_action_log(message: str, emoji: str = "📝") -> Dict[str, Any]: - """创建一个动作日志""" - full_message = f"{emoji} {message}" - debug_print(full_message) - logger.info(full_message) - print(f"[ACTION] {full_message}", flush=True) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": full_message - } - } - def find_acid_base_vessel(G: nx.DiGraph, reagent: str) -> str: """ 查找酸碱试剂容器,支持多种匹配模式 diff --git a/unilabos/compile/clean_vessel_protocol.py b/unilabos/compile/clean_vessel_protocol.py index 10d2e690..b9f903c8 100644 --- a/unilabos/compile/clean_vessel_protocol.py +++ b/unilabos/compile/clean_vessel_protocol.py @@ -1,102 +1,9 @@ from typing import List, Dict, Any import networkx as nx -from .utils.vessel_parser import get_vessel +from .utils.vessel_parser import get_vessel, find_solvent_vessel from .pump_protocol import generate_pump_protocol -def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: - """ - 查找溶剂容器,支持多种匹配模式: - 1. 容器名称匹配(如 flask_water, reagent_bottle_1-DMF) - 2. 容器内液体类型匹配(如 liquid_type: "DMF", "ethanol") - """ - print(f"CLEAN_VESSEL: 正在查找溶剂 '{solvent}' 的容器...") - - # 第一步:通过容器名称匹配 - possible_names = [ - f"flask_{solvent}", # flask_water, flask_ethanol - f"bottle_{solvent}", # bottle_water, bottle_ethanol - f"vessel_{solvent}", # vessel_water, vessel_ethanol - f"{solvent}_flask", # water_flask, ethanol_flask - f"{solvent}_bottle", # water_bottle, ethanol_bottle - f"{solvent}", # 直接用溶剂名 - f"solvent_{solvent}", # solvent_water, solvent_ethanol - f"reagent_bottle_{solvent}", # reagent_bottle_DMF - ] - - # 尝试名称匹配 - for vessel_name in possible_names: - if vessel_name in G.nodes(): - print(f"CLEAN_VESSEL: 通过名称匹配找到容器: {vessel_name}") - return vessel_name - - # 第二步:通过模糊名称匹配(名称中包含溶剂名) - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - # 检查节点ID或名称中是否包含溶剂名 - node_name = G.nodes[node_id].get('name', '').lower() - if (solvent.lower() in node_id.lower() or - solvent.lower() in node_name): - print(f"CLEAN_VESSEL: 通过模糊名称匹配找到容器: {node_id} (名称: {node_name})") - return node_id - - # 第三步:通过液体类型匹配 - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - vessel_data = G.nodes[node_id].get('data', {}) - liquids = vessel_data.get('liquid', []) - - for liquid in liquids: - if isinstance(liquid, dict): - # 支持两种格式的液体类型字段 - liquid_type = liquid.get('liquid_type') or liquid.get('name', '') - reagent_name = vessel_data.get('reagent_name', '') - config_reagent = G.nodes[node_id].get('config', {}).get('reagent', '') - - # 检查多个可能的字段 - if (liquid_type.lower() == solvent.lower() or - reagent_name.lower() == solvent.lower() or - config_reagent.lower() == solvent.lower()): - print(f"CLEAN_VESSEL: 通过液体类型匹配找到容器: {node_id}") - print(f" - liquid_type: {liquid_type}") - print(f" - reagent_name: {reagent_name}") - print(f" - config.reagent: {config_reagent}") - return node_id - - # 第四步:列出所有可用的容器信息帮助调试 - available_containers = [] - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - vessel_data = G.nodes[node_id].get('data', {}) - config_data = G.nodes[node_id].get('config', {}) - liquids = vessel_data.get('liquid', []) - - container_info = { - 'id': node_id, - 'name': G.nodes[node_id].get('name', ''), - 'liquid_types': [], - 'reagent_name': vessel_data.get('reagent_name', ''), - 'config_reagent': config_data.get('reagent', '') - } - - for liquid in liquids: - if isinstance(liquid, dict): - liquid_type = liquid.get('liquid_type') or liquid.get('name', '') - if liquid_type: - container_info['liquid_types'].append(liquid_type) - - available_containers.append(container_info) - - print(f"CLEAN_VESSEL: 可用容器列表:") - for container in available_containers: - print(f" - {container['id']}: {container['name']}") - print(f" 液体类型: {container['liquid_types']}") - print(f" 试剂名称: {container['reagent_name']}") - print(f" 配置试剂: {container['config_reagent']}") - - raise ValueError(f"未找到溶剂 '{solvent}' 的容器。尝试了名称匹配: {possible_names}") - - def find_solvent_vessel_by_any_match(G: nx.DiGraph, solvent: str) -> str: """ 增强版溶剂容器查找,支持各种匹配方式的别名函数 diff --git a/unilabos/compile/dissolve_protocol.py b/unilabos/compile/dissolve_protocol.py index 3a504e25..d57b7eb5 100644 --- a/unilabos/compile/dissolve_protocol.py +++ b/unilabos/compile/dissolve_protocol.py @@ -1,33 +1,22 @@ +from functools import partial + import networkx as nx import re import logging from typing import List, Dict, Any, Union -from unilabos.compile.utils.vessel_parser import get_vessel +from .utils.vessel_parser import get_vessel +from .utils.logger_util import action_log from .pump_protocol import generate_pump_protocol_with_rinsing logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"[DISSOLVE] {message}", flush=True) logger.info(f"[DISSOLVE] {message}") # 🆕 创建进度日志动作 -def create_action_log(message: str, emoji: str = "📝") -> Dict[str, Any]: - """创建一个动作日志""" - full_message = f"{emoji} {message}" - debug_print(full_message) - logger.info(full_message) - print(f"[ACTION] {full_message}", flush=True) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": full_message - } - } +create_action_log = partial(action_log, prefix="[DISSOLVE]") def parse_volume_input(volume_input: Union[str, float]) -> float: """ diff --git a/unilabos/compile/evacuateandrefill_protocol.py b/unilabos/compile/evacuateandrefill_protocol.py index 5e6a4b48..bef4b923 100644 --- a/unilabos/compile/evacuateandrefill_protocol.py +++ b/unilabos/compile/evacuateandrefill_protocol.py @@ -1,9 +1,12 @@ +from functools import partial + import networkx as nx import logging import uuid import sys from typing import List, Dict, Any, Optional from .utils.vessel_parser import get_vessel +from .utils.logger_util import action_log from .pump_protocol import generate_pump_protocol_with_rinsing, generate_pump_protocol # 设置日志 @@ -22,48 +25,17 @@ def debug_print(message): try: # 确保消息是字符串格式 safe_message = str(message) - print(f"[抽真空充气] {safe_message}", flush=True) logger.info(f"[抽真空充气] {safe_message}") except UnicodeEncodeError: # 如果编码失败,尝试替换不支持的字符 safe_message = str(message).encode('utf-8', errors='replace').decode('utf-8') - print(f"[抽真空充气] {safe_message}", flush=True) logger.info(f"[抽真空充气] {safe_message}") except Exception as e: # 最后的安全措施 fallback_message = f"日志输出错误: {repr(message)}" - print(f"[抽真空充气] {fallback_message}", flush=True) logger.info(f"[抽真空充气] {fallback_message}") -def create_action_log(message: str, emoji: str = "📝") -> Dict[str, Any]: - """创建一个动作日志 - 支持中文和emoji""" - try: - full_message = f"{emoji} {message}" - debug_print(full_message) - logger.info(full_message) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": full_message, - "progress_message": full_message - } - } - except Exception as e: - # 如果emoji有问题,使用纯文本 - safe_message = f"[日志] {message}" - debug_print(safe_message) - logger.info(safe_message) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": safe_message, - "progress_message": safe_message - } - } +create_action_log = partial(action_log, prefix="[抽真空充气]") def find_gas_source(G: nx.DiGraph, gas: str) -> str: """ diff --git a/unilabos/compile/evaporate_protocol.py b/unilabos/compile/evaporate_protocol.py index 33bae762..ba396c13 100644 --- a/unilabos/compile/evaporate_protocol.py +++ b/unilabos/compile/evaporate_protocol.py @@ -3,75 +3,14 @@ import networkx as nx import logging import re from .utils.vessel_parser import get_vessel +from .utils.unit_parser import parse_time_input logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"🧪 [EVAPORATE] {message}", flush=True) logger.info(f"[EVAPORATE] {message}") -def parse_time_input(time_input: Union[str, float]) -> float: - """ - 解析时间输入,支持带单位的字符串 - - Args: - time_input: 时间输入(如 "3 min", "180", "0.5 h" 等) - - Returns: - float: 时间(秒) - """ - if isinstance(time_input, (int, float)): - debug_print(f"⏱️ 时间输入为数字: {time_input}s ✨") - return float(time_input) # 🔧 确保返回float - - if not time_input or not str(time_input).strip(): - debug_print(f"⚠️ 时间输入为空,使用默认值: 180s (3分钟) 🕐") - return 180.0 # 默认3分钟 - - time_str = str(time_input).lower().strip() - debug_print(f"🔍 解析时间输入: '{time_str}' 📝") - - # 处理未知时间 - if time_str in ['?', 'unknown', 'tbd']: - default_time = 180.0 # 默认3分钟 - debug_print(f"❓ 检测到未知时间,使用默认值: {default_time}s (3分钟) 🤷‍♀️") - return default_time - - # 移除空格并提取数字和单位 - time_clean = re.sub(r'\s+', '', time_str) - - # 匹配数字和单位的正则表达式 - match = re.match(r'([0-9]*\.?[0-9]+)\s*(s|sec|second|min|minute|h|hr|hour|d|day)?', time_clean) - - if not match: - # 如果无法解析,尝试直接转换为数字(默认秒) - try: - value = float(time_str) - debug_print(f"✅ 时间解析成功: {time_str} → {value}s(无单位,默认秒)⏰") - return float(value) # 🔧 确保返回float - except ValueError: - debug_print(f"❌ 无法解析时间: '{time_str}',使用默认值180s (3分钟) 😅") - return 180.0 - - value = float(match.group(1)) - unit = match.group(2) or 's' # 默认单位为秒 - - # 转换为秒 - if unit in ['min', 'minute']: - time_sec = value * 60.0 # min -> s - debug_print(f"🕐 时间转换: {value} 分钟 → {time_sec}s ⏰") - elif unit in ['h', 'hr', 'hour']: - time_sec = value * 3600.0 # h -> s - debug_print(f"🕐 时间转换: {value} 小时 → {time_sec}s ({time_sec/60:.1f}分钟) ⏰") - elif unit in ['d', 'day']: - time_sec = value * 86400.0 # d -> s - debug_print(f"🕐 时间转换: {value} 天 → {time_sec}s ({time_sec/3600:.1f}小时) ⏰") - else: # s, sec, second 或默认 - time_sec = value # 已经是s - debug_print(f"🕐 时间转换: {value}s → {time_sec}s (已是秒) ⏰") - - return float(time_sec) # 🔧 确保返回float def find_rotavap_device(G: nx.DiGraph, vessel: str = None) -> Optional[str]: """ diff --git a/unilabos/compile/filter_protocol.py b/unilabos/compile/filter_protocol.py index cf6d5070..8e5efd96 100644 --- a/unilabos/compile/filter_protocol.py +++ b/unilabos/compile/filter_protocol.py @@ -8,7 +8,6 @@ logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"🧪 [FILTER] {message}", flush=True) logger.info(f"[FILTER] {message}") def find_filter_device(G: nx.DiGraph) -> str: @@ -52,7 +51,7 @@ def validate_vessel(G: nx.DiGraph, vessel: str, vessel_type: str = "容器") -> def generate_filter_protocol( G: nx.DiGraph, vessel: dict, # 🔧 修改:从字符串改为字典类型 - filtrate_vessel: str = "", + filtrate_vessel: dict = {"id": "waste"}, **kwargs ) -> List[Dict[str, Any]]: """ @@ -70,6 +69,7 @@ def generate_filter_protocol( # 🔧 核心修改:从字典中提取容器ID vessel_id, vessel_data = get_vessel(vessel) + filtrate_vessel_id, filtrate_vessel_data = get_vessel(filtrate_vessel) debug_print("🌊" * 20) debug_print("🚀 开始生成过滤协议(支持体积运算)✨") @@ -103,7 +103,7 @@ def generate_filter_protocol( # 验证可选参数 debug_print(" 🔍 验证可选参数...") if filtrate_vessel: - validate_vessel(G, filtrate_vessel, "滤液容器") + validate_vessel(G, filtrate_vessel_id, "滤液容器") debug_print(" 🌊 模式: 过滤并收集滤液 💧") else: debug_print(" 🧱 模式: 过滤并收集固体 🔬") @@ -213,7 +213,7 @@ def generate_filter_protocol( debug_print(" ⚙️ 构建过滤参数...") filter_kwargs = { "vessel": filter_device, # 过滤器设备 - "filtrate_vessel": filtrate_vessel, # 滤液容器(可能为空) + "filtrate_vessel": filtrate_vessel_id, # 滤液容器(可能为空) "stir": kwargs.get("stir", False), "stir_speed": kwargs.get("stir_speed", 0.0), "temp": kwargs.get("temp", 25.0), @@ -245,7 +245,7 @@ def generate_filter_protocol( debug_print("📍 步骤5: 收集滤液... 💧") if filtrate_vessel: - debug_print(f" 🧪 收集滤液: {filter_device} → {filtrate_vessel} 💧") + debug_print(f" 🧪 收集滤液: {filter_device} → {filtrate_vessel_id} 💧") try: debug_print(" 🔄 开始执行收集操作...") @@ -274,20 +274,20 @@ def generate_filter_protocol( debug_print(" 🔧 更新滤液容器体积...") # 更新filtrate_vessel在图中的体积(如果它是节点) - if filtrate_vessel in G.nodes(): - if 'data' not in G.nodes[filtrate_vessel]: - G.nodes[filtrate_vessel]['data'] = {} + if filtrate_vessel_id in G.nodes(): + if 'data' not in G.nodes[filtrate_vessel_id]: + G.nodes[filtrate_vessel_id]['data'] = {} - current_filtrate_volume = G.nodes[filtrate_vessel]['data'].get('liquid_volume', 0.0) + current_filtrate_volume = G.nodes[filtrate_vessel_id]['data'].get('liquid_volume', 0.0) if isinstance(current_filtrate_volume, list): if len(current_filtrate_volume) > 0: - G.nodes[filtrate_vessel]['data']['liquid_volume'][0] += expected_filtrate_volume + G.nodes[filtrate_vessel_id]['data']['liquid_volume'][0] += expected_filtrate_volume else: - G.nodes[filtrate_vessel]['data']['liquid_volume'] = [expected_filtrate_volume] + G.nodes[filtrate_vessel_id]['data']['liquid_volume'] = [expected_filtrate_volume] else: - G.nodes[filtrate_vessel]['data']['liquid_volume'] = current_filtrate_volume + expected_filtrate_volume + G.nodes[filtrate_vessel_id]['data']['liquid_volume'] = current_filtrate_volume + expected_filtrate_volume - debug_print(f" 📊 滤液容器 {filtrate_vessel} 体积增加 {expected_filtrate_volume:.2f}mL") + debug_print(f" 📊 滤液容器 {filtrate_vessel_id} 体积增加 {expected_filtrate_volume:.2f}mL") else: debug_print(" ⚠️ 收集协议返回空序列 🤔") @@ -352,7 +352,7 @@ def generate_filter_protocol( debug_print(f"📊 总动作数: {len(action_sequence)} 个 📝") debug_print(f"🥽 过滤容器: {vessel_id} 🧪") debug_print(f"🌊 过滤器设备: {filter_device} 🔧") - debug_print(f"💧 滤液容器: {filtrate_vessel or '无(保留固体)'} 🧱") + debug_print(f"💧 滤液容器: {filtrate_vessel_id or '无(保留固体)'} 🧱") debug_print(f"⏱️ 预计总时间: {(len(action_sequence) * 5):.0f} 秒 ⌛") if original_liquid_volume > 0: debug_print(f"📊 体积变化统计:") @@ -364,4 +364,3 @@ def generate_filter_protocol( debug_print("🎊" * 20) return action_sequence - diff --git a/unilabos/compile/heatchill_protocol.py b/unilabos/compile/heatchill_protocol.py index 3165a376..6b0e3e38 100644 --- a/unilabos/compile/heatchill_protocol.py +++ b/unilabos/compile/heatchill_protocol.py @@ -3,81 +3,14 @@ import networkx as nx import logging import re from .utils.vessel_parser import get_vessel +from .utils.unit_parser import parse_time_input logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"🌡️ [HEATCHILL] {message}", flush=True) logger.info(f"[HEATCHILL] {message}") -def parse_time_input(time_input: Union[str, float, int]) -> float: - """ - 解析时间输入(统一函数) - - Args: - time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) - - Returns: - float: 时间(秒) - """ - if not time_input: - return 300.0 - - # 🔢 处理数值输入 - if isinstance(time_input, (int, float)): - result = float(time_input) - debug_print(f"⏰ 数值时间: {time_input} → {result}s") - return result - - # 📝 处理字符串输入 - time_str = str(time_input).lower().strip() - debug_print(f"🔍 解析时间: '{time_str}'") - - # ❓ 特殊值处理 - special_times = { - '?': 300.0, 'unknown': 300.0, 'tbd': 300.0, - 'overnight': 43200.0, 'several hours': 10800.0, - 'few hours': 7200.0, 'long time': 3600.0, 'short time': 300.0 - } - - if time_str in special_times: - result = special_times[time_str] - debug_print(f"🎯 特殊时间: '{time_str}' → {result}s ({result/60:.1f}分钟)") - return result - - # 🔢 纯数字处理 - try: - result = float(time_str) - debug_print(f"⏰ 纯数字: {time_str} → {result}s") - return result - except ValueError: - pass - - # 📐 正则表达式解析 - pattern = r'(\d+\.?\d*)\s*([a-z]*)' - match = re.match(pattern, time_str) - - if not match: - debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 300s") - return 300.0 - - value = float(match.group(1)) - unit = match.group(2) or 's' - - # 📏 单位转换 - unit_multipliers = { - 's': 1.0, 'sec': 1.0, 'second': 1.0, 'seconds': 1.0, - 'm': 60.0, 'min': 60.0, 'mins': 60.0, 'minute': 60.0, 'minutes': 60.0, - 'h': 3600.0, 'hr': 3600.0, 'hrs': 3600.0, 'hour': 3600.0, 'hours': 3600.0, - 'd': 86400.0, 'day': 86400.0, 'days': 86400.0 - } - - multiplier = unit_multipliers.get(unit, 1.0) - result = value * multiplier - - debug_print(f"✅ 时间解析: '{time_str}' → {value} {unit} → {result}s ({result/60:.1f}分钟)") - return result def parse_temp_input(temp_input: Union[str, float], default_temp: float = 25.0) -> float: """ @@ -287,7 +220,6 @@ def generate_heat_chill_protocol( "device_id": heatchill_id, "action_name": "heat_chill", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id "temp": float(final_temp), "time": float(final_time), "stir": bool(stir), @@ -321,7 +253,7 @@ def generate_heat_chill_to_temp_protocol( **kwargs ) -> List[Dict[str, Any]]: """生成加热到指定温度的协议(简化版)""" - vessel_id = vessel["id"] + vessel_id, _ = get_vessel(vessel) debug_print(f"🌡️ 生成加热到温度协议: {vessel_id} → {temp}°C") return generate_heat_chill_protocol(G, vessel, temp, time, **kwargs) @@ -335,7 +267,7 @@ def generate_heat_chill_start_protocol( """生成开始加热操作的协议序列""" # 🔧 核心修改:从字典中提取容器ID - vessel_id = vessel["id"] + vessel_id, _ = get_vessel(vessel) debug_print("🔥 开始生成启动加热协议 ✨") debug_print(f"🥽 vessel: {vessel} (ID: {vessel_id}), 🌡️ temp: {temp}°C") @@ -353,7 +285,6 @@ def generate_heat_chill_start_protocol( "device_id": heatchill_id, "action_name": "heat_chill_start", "action_kwargs": { - "vessel": vessel_id, # 🔧 使用 vessel_id "temp": temp, "purpose": purpose or f"开始加热到 {temp}°C" } @@ -370,7 +301,7 @@ def generate_heat_chill_stop_protocol( """生成停止加热操作的协议序列""" # 🔧 核心修改:从字典中提取容器ID - vessel_id = vessel["id"] + vessel_id, _ = get_vessel(vessel) debug_print("🛑 开始生成停止加热协议 ✨") debug_print(f"🥽 vessel: {vessel} (ID: {vessel_id})") @@ -388,10 +319,8 @@ def generate_heat_chill_stop_protocol( "device_id": heatchill_id, "action_name": "heat_chill_stop", "action_kwargs": { - "vessel": vessel_id # 🔧 使用 vessel_id } }] debug_print(f"✅ 停止加热协议生成完成 🎯") return action_sequence - diff --git a/unilabos/compile/pump_protocol.py b/unilabos/compile/pump_protocol.py index a54218e3..f8297f41 100644 --- a/unilabos/compile/pump_protocol.py +++ b/unilabos/compile/pump_protocol.py @@ -10,11 +10,7 @@ logger = logging.getLogger(__name__) def debug_print(message): """强制输出调试信息""" - timestamp = time_module.strftime("%H:%M:%S") - output = f"[{timestamp}] {message}" - print(output, flush=True) - sys.stdout.flush() - # 同时写入日志 + output = f"[TRANSFER] {message}" logger.info(output) def get_vessel_liquid_volume(G: nx.DiGraph, vessel: str) -> float: diff --git a/unilabos/compile/recrystallize_protocol.py b/unilabos/compile/recrystallize_protocol.py index 736a9af2..61653e23 100644 --- a/unilabos/compile/recrystallize_protocol.py +++ b/unilabos/compile/recrystallize_protocol.py @@ -3,91 +3,16 @@ import re import logging from typing import List, Dict, Any, Tuple, Union from .utils.vessel_parser import get_vessel +from .utils.unit_parser import parse_volume_input from .pump_protocol import generate_pump_protocol_with_rinsing logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"💎 [RECRYSTALLIZE] {message}", flush=True) logger.info(f"[RECRYSTALLIZE] {message}") -def parse_volume_with_units(volume_input: Union[str, float, int], default_unit: str = "mL") -> float: - """ - 解析带单位的体积输入 - - Args: - volume_input: 体积输入(如 "100 mL", "2.5 L", "500", "?", 100.0) - default_unit: 默认单位(默认为毫升) - - Returns: - float: 体积(毫升) - """ - if not volume_input: - debug_print("⚠️ 体积输入为空,返回 0.0mL 📦") - return 0.0 - - # 处理数值输入 - if isinstance(volume_input, (int, float)): - result = float(volume_input) - debug_print(f"🔢 数值体积输入: {volume_input} → {result}mL(默认单位)💧") - return result - - # 处理字符串输入 - volume_str = str(volume_input).lower().strip() - debug_print(f"🔍 解析体积字符串: '{volume_str}' 📝") - - # 处理特殊值 - if volume_str in ['?', 'unknown', 'tbd', 'to be determined']: - default_volume = 50.0 # 50mL默认值 - debug_print(f"❓ 检测到未知体积,使用默认值: {default_volume}mL 🎯") - return default_volume - - # 如果是纯数字,使用默认单位 - try: - value = float(volume_str) - if default_unit.lower() in ["ml", "milliliter"]: - result = value - elif default_unit.lower() in ["l", "liter"]: - result = value * 1000.0 - elif default_unit.lower() in ["μl", "ul", "microliter"]: - result = value / 1000.0 - else: - result = value # 默认mL - debug_print(f"🔢 纯数字输入: {volume_str} → {result}mL(单位: {default_unit})📏") - return result - except ValueError: - pass - - # 移除空格并提取数字和单位 - volume_clean = re.sub(r'\s+', '', volume_str) - - # 匹配数字和单位的正则表达式 - match = re.match(r'([0-9]*\.?[0-9]+)\s*(ml|l|μl|ul|microliter|milliliter|liter)?', volume_clean) - - if not match: - debug_print(f"⚠️ 无法解析体积: '{volume_str}',使用默认值: 50mL 🎯") - return 50.0 - - value = float(match.group(1)) - unit = match.group(2) or default_unit.lower() - - # 转换为毫升 - if unit in ['l', 'liter']: - volume = value * 1000.0 # L -> mL - debug_print(f"📏 升转毫升: {value}L → {volume}mL 💧") - elif unit in ['μl', 'ul', 'microliter']: - volume = value / 1000.0 # μL -> mL - debug_print(f"📏 微升转毫升: {value}μL → {volume}mL 💧") - else: # ml, milliliter 或默认 - volume = value # 已经是mL - debug_print(f"📏 毫升单位: {value}mL → {volume}mL 💧") - - debug_print(f"✅ 体积解析完成: '{volume_str}' → {volume}mL ✨") - return volume - - def parse_ratio(ratio_str: str) -> Tuple[float, float]: """ 解析比例字符串,支持多种格式 @@ -137,131 +62,6 @@ def parse_ratio(ratio_str: str) -> Tuple[float, float]: return 1.0, 1.0 -def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: - """ - 查找溶剂容器 - - Args: - G: 网络图 - solvent: 溶剂名称 - - Returns: - str: 溶剂容器ID - """ - debug_print(f"🔍 正在查找溶剂 '{solvent}' 的容器... 🧪") - - # 构建可能的容器名称 - possible_names = [ - f"flask_{solvent}", - f"bottle_{solvent}", - f"reagent_{solvent}", - f"reagent_bottle_{solvent}", - f"{solvent}_flask", - f"{solvent}_bottle", - f"{solvent}", - f"vessel_{solvent}", - ] - - debug_print(f"📋 候选容器名称: {possible_names[:3]}... (共{len(possible_names)}个) 📝") - - # 第一步:通过容器名称匹配 - debug_print(" 🎯 步骤1: 精确名称匹配...") - for vessel_name in possible_names: - if vessel_name in G.nodes(): - debug_print(f" 🎉 通过名称匹配找到容器: {vessel_name} ✨") - return vessel_name - - # 第二步:通过模糊匹配(节点ID和名称) - debug_print(" 🔍 步骤2: 模糊名称匹配...") - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - node_name = G.nodes[node_id].get('name', '').lower() - - if solvent.lower() in node_id.lower() or solvent.lower() in node_name: - debug_print(f" 🎉 通过模糊匹配找到容器: {node_id} (名称: {node_name}) ✨") - return node_id - - # 第三步:通过配置中的试剂信息匹配 - debug_print(" 🧪 步骤3: 配置试剂信息匹配...") - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - # 检查 config 中的 reagent 字段 - node_config = G.nodes[node_id].get('config', {}) - config_reagent = node_config.get('reagent', '').lower() - - if config_reagent and solvent.lower() == config_reagent: - debug_print(f" 🎉 通过config.reagent匹配找到容器: {node_id} (试剂: {config_reagent}) ✨") - return node_id - - # 第四步:通过数据中的试剂信息匹配 - debug_print(" 🧪 步骤4: 数据试剂信息匹配...") - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - vessel_data = G.nodes[node_id].get('data', {}) - - # 检查 data 中的 reagent_name 字段 - reagent_name = vessel_data.get('reagent_name', '').lower() - if reagent_name and solvent.lower() == reagent_name: - debug_print(f" 🎉 通过data.reagent_name匹配找到容器: {node_id} (试剂: {reagent_name}) ✨") - return node_id - - # 检查 data 中的液体信息 - liquids = vessel_data.get('liquid', []) - for liquid in liquids: - if isinstance(liquid, dict): - liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() - - if solvent.lower() in liquid_type: - debug_print(f" 🎉 通过液体类型匹配找到容器: {node_id} (液体类型: {liquid_type}) ✨") - return node_id - - # 第五步:部分匹配(如果前面都没找到) - debug_print(" 🔍 步骤5: 部分匹配...") - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - node_config = G.nodes[node_id].get('config', {}) - node_data = G.nodes[node_id].get('data', {}) - node_name = G.nodes[node_id].get('name', '').lower() - - config_reagent = node_config.get('reagent', '').lower() - data_reagent = node_data.get('reagent_name', '').lower() - - # 检查是否包含溶剂名称 - if (solvent.lower() in config_reagent or - solvent.lower() in data_reagent or - solvent.lower() in node_name or - solvent.lower() in node_id.lower()): - debug_print(f" 🎉 通过部分匹配找到容器: {node_id} ✨") - debug_print(f" - 节点名称: {node_name}") - debug_print(f" - 配置试剂: {config_reagent}") - debug_print(f" - 数据试剂: {data_reagent}") - return node_id - - # 调试信息:列出所有容器 - debug_print(" 🔎 调试信息:列出所有容器...") - container_list = [] - for node_id in G.nodes(): - if G.nodes[node_id].get('type') == 'container': - node_config = G.nodes[node_id].get('config', {}) - node_data = G.nodes[node_id].get('data', {}) - node_name = G.nodes[node_id].get('name', '') - - container_info = { - 'id': node_id, - 'name': node_name, - 'config_reagent': node_config.get('reagent', ''), - 'data_reagent': node_data.get('reagent_name', '') - } - container_list.append(container_info) - debug_print(f" - 容器: {node_id}, 名称: {node_name}, config试剂: {node_config.get('reagent', '')}, data试剂: {node_data.get('reagent_name', '')}") - - debug_print(f"❌ 找不到溶剂 '{solvent}' 对应的容器 😭") - debug_print(f"🔍 查找的溶剂: '{solvent}' (小写: '{solvent.lower()}')") - debug_print(f"📊 总共发现 {len(container_list)} 个容器") - - raise ValueError(f"找不到溶剂 '{solvent}' 对应的容器") - - def generate_recrystallize_protocol( G: nx.DiGraph, vessel: dict, # 🔧 修改:从字符串改为字典类型 @@ -322,7 +122,7 @@ def generate_recrystallize_protocol( # 2. 解析体积(支持单位) debug_print("📍 步骤2: 解析体积(支持单位)... 💧") - final_volume = parse_volume_with_units(volume, "mL") + final_volume = parse_volume_input(volume, "mL") debug_print(f"🎯 体积解析完成: {volume} → {final_volume}mL ✨") # 3. 解析比例 @@ -574,7 +374,7 @@ def test_recrystallize_protocol(): debug_print("💧 测试体积解析...") test_volumes = ["100 mL", "2.5 L", "500", "50.5", "?", "invalid"] for vol in test_volumes: - parsed = parse_volume_with_units(vol) + parsed = parse_volume_input(vol) debug_print(f" 📊 体积 '{vol}' -> {parsed}mL") # 测试比例解析 diff --git a/unilabos/compile/run_column_protocol.py b/unilabos/compile/run_column_protocol.py index f921321f..b792097d 100644 --- a/unilabos/compile/run_column_protocol.py +++ b/unilabos/compile/run_column_protocol.py @@ -8,7 +8,6 @@ logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"🏛️ [RUN_COLUMN] {message}", flush=True) logger.info(f"[RUN_COLUMN] {message}") def parse_percentage(pct_str: str) -> float: diff --git a/unilabos/compile/separate_protocol.py b/unilabos/compile/separate_protocol.py index cdd8862d..6b2800d3 100644 --- a/unilabos/compile/separate_protocol.py +++ b/unilabos/compile/separate_protocol.py @@ -1,9 +1,12 @@ +from functools import partial + import networkx as nx import re import logging import sys from typing import List, Dict, Any, Union from .utils.vessel_parser import get_vessel +from .utils.logger_util import action_log from .pump_protocol import generate_pump_protocol_with_rinsing logger = logging.getLogger(__name__) @@ -21,48 +24,17 @@ def debug_print(message): try: # 确保消息是字符串格式 safe_message = str(message) - print(f"🌀 [SEPARATE] {safe_message}", flush=True) logger.info(f"[SEPARATE] {safe_message}") except UnicodeEncodeError: # 如果编码失败,尝试替换不支持的字符 safe_message = str(message).encode('utf-8', errors='replace').decode('utf-8') - print(f"🌀 [SEPARATE] {safe_message}", flush=True) logger.info(f"[SEPARATE] {safe_message}") except Exception as e: # 最后的安全措施 fallback_message = f"日志输出错误: {repr(message)}" - print(f"🌀 [SEPARATE] {fallback_message}", flush=True) logger.info(f"[SEPARATE] {fallback_message}") -def create_action_log(message: str, emoji: str = "📝") -> Dict[str, Any]: - """创建一个动作日志 - 支持中文和emoji""" - try: - full_message = f"{emoji} {message}" - debug_print(full_message) - logger.info(full_message) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": full_message, - "progress_message": full_message - } - } - except Exception as e: - # 如果emoji有问题,使用纯文本 - safe_message = f"[日志] {message}" - debug_print(safe_message) - logger.info(safe_message) - - return { - "action_name": "wait", - "action_kwargs": { - "time": 0.1, - "log_message": safe_message, - "progress_message": safe_message - } - } +create_action_log = partial(action_log, prefix="[SEPARATE]") def generate_separate_protocol( diff --git a/unilabos/compile/stir_protocol.py b/unilabos/compile/stir_protocol.py index e13c1f8a..64a821a8 100644 --- a/unilabos/compile/stir_protocol.py +++ b/unilabos/compile/stir_protocol.py @@ -3,81 +3,14 @@ import networkx as nx import logging import re +from .utils.unit_parser import parse_time_input + logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"🌪️ [STIR] {message}", flush=True) logger.info(f"[STIR] {message}") -def parse_time_input(time_input: Union[str, float, int], default_unit: str = "s") -> float: - """ - 统一的时间解析函数(精简版) - - Args: - time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) - default_unit: 默认单位(默认为秒) - - Returns: - float: 时间(秒) - """ - if not time_input: - return 100.0 # 默认100秒 - - # 🔢 处理数值输入 - if isinstance(time_input, (int, float)): - result = float(time_input) - debug_print(f"⏰ 数值时间: {time_input} → {result}s") - return result - - # 📝 处理字符串输入 - time_str = str(time_input).lower().strip() - debug_print(f"🔍 解析时间: '{time_str}'") - - # ❓ 特殊值处理 - special_times = { - '?': 300.0, 'unknown': 300.0, 'tbd': 300.0, - 'briefly': 30.0, 'quickly': 60.0, 'slowly': 600.0, - 'several minutes': 300.0, 'few minutes': 180.0, 'overnight': 3600.0 - } - - if time_str in special_times: - result = special_times[time_str] - debug_print(f"🎯 特殊时间: '{time_str}' → {result}s ({result/60:.1f}分钟)") - return result - - # 🔢 纯数字处理 - try: - result = float(time_str) - debug_print(f"⏰ 纯数字: {time_str} → {result}s") - return result - except ValueError: - pass - - # 📐 正则表达式解析 - pattern = r'(\d+\.?\d*)\s*([a-z]*)' - match = re.match(pattern, time_str) - - if not match: - debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 100s") - return 100.0 - - value = float(match.group(1)) - unit = match.group(2) or default_unit - - # 📏 单位转换 - unit_multipliers = { - 's': 1.0, 'sec': 1.0, 'second': 1.0, 'seconds': 1.0, - 'm': 60.0, 'min': 60.0, 'mins': 60.0, 'minute': 60.0, 'minutes': 60.0, - 'h': 3600.0, 'hr': 3600.0, 'hrs': 3600.0, 'hour': 3600.0, 'hours': 3600.0, - 'd': 86400.0, 'day': 86400.0, 'days': 86400.0 - } - - multiplier = unit_multipliers.get(unit, 1.0) - result = value * multiplier - - debug_print(f"✅ 时间解析: '{time_str}' → {value} {unit} → {result}s ({result/60:.1f}分钟)") - return result def find_connected_stirrer(G: nx.DiGraph, vessel: str = None) -> str: """查找与指定容器相连的搅拌设备""" diff --git a/unilabos/compile/transfer_protocol.py b/unilabos/compile/transfer_protocol.py deleted file mode 100644 index 202b009f..00000000 --- a/unilabos/compile/transfer_protocol.py +++ /dev/null @@ -1,79 +0,0 @@ -from typing import List, Dict, Any -import networkx as nx - -def generate_transfer_protocol( - G: nx.DiGraph, - from_vessel: str, - to_vessel: str, - volume: float, - amount: str = "", - time: float = 0, - viscous: bool = False, - rinsing_solvent: str = "", - rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, - solid: bool = False -) -> List[Dict[str, Any]]: - """ - 生成液体转移操作的协议序列 - - Args: - G: 有向图,节点为设备和容器 - from_vessel: 源容器 - to_vessel: 目标容器 - volume: 转移体积 (mL) - amount: 数量描述 (可选) - time: 转移时间 (秒,可选) - viscous: 是否为粘性液体 - rinsing_solvent: 冲洗溶剂 (可选) - rinsing_volume: 冲洗体积 (mL,可选) - rinsing_repeats: 冲洗重复次数 - solid: 是否涉及固体 - - Returns: - List[Dict[str, Any]]: 转移操作的动作序列 - - Raises: - ValueError: 当找不到合适的转移设备时抛出异常 - - Examples: - transfer_protocol = generate_transfer_protocol(G, "flask_1", "reactor", 10.0) - """ - action_sequence = [] - - # 查找虚拟转移泵设备用于液体转移 - 修复:应该查找 virtual_transfer_pump - pump_nodes = [node for node in G.nodes() - if G.nodes[node].get('class') == 'virtual_transfer_pump'] - - if not pump_nodes: - raise ValueError("没有找到可用的转移泵设备进行液体转移") - - # 使用第一个可用的泵 - pump_id = pump_nodes[0] - - # 验证容器是否存在 - if from_vessel not in G.nodes(): - raise ValueError(f"源容器 {from_vessel} 不存在于图中") - - if to_vessel not in G.nodes(): - raise ValueError(f"目标容器 {to_vessel} 不存在于图中") - - # 执行液体转移操作 - 参数完全匹配Transfer.action - action_sequence.append({ - "device_id": pump_id, - "action_name": "transfer", - "action_kwargs": { - "from_vessel": from_vessel, - "to_vessel": to_vessel, - "volume": volume, - "amount": amount, - "time": time, - "viscous": viscous, - "rinsing_solvent": rinsing_solvent, - "rinsing_volume": rinsing_volume, - "rinsing_repeats": rinsing_repeats, - "solid": solid - } - }) - - return action_sequence \ No newline at end of file diff --git a/unilabos/compile/utils/logger_util.py b/unilabos/compile/utils/logger_util.py new file mode 100644 index 00000000..635e11e2 --- /dev/null +++ b/unilabos/compile/utils/logger_util.py @@ -0,0 +1,36 @@ +# 🆕 创建进度日志动作 +import logging +from typing import Dict, Any + +logger = logging.getLogger(__name__) + +def debug_print(message, prefix="[UNIT_PARSER]"): + """调试输出""" + logger.info(f"{prefix} {message}") + + +def action_log(message: str, emoji: str = "📝", prefix="[HIGH-LEVEL OPERATION]") -> Dict[str, Any]: + """创建一个动作日志 - 支持中文和emoji""" + try: + full_message = f"{prefix} {emoji} {message}" + + return { + "action_name": "wait", + "action_kwargs": { + "time": 0.1, + "log_message": full_message, + "progress_message": full_message + } + } + except Exception as e: + # 如果emoji有问题,使用纯文本 + safe_message = f"{prefix} {message}" + + return { + "action_name": "wait", + "action_kwargs": { + "time": 0.1, + "log_message": safe_message, + "progress_message": safe_message + } + } \ No newline at end of file diff --git a/unilabos/compile/utils/unit_parser.py b/unilabos/compile/utils/unit_parser.py index d1d297cb..19a867bd 100644 --- a/unilabos/compile/utils/unit_parser.py +++ b/unilabos/compile/utils/unit_parser.py @@ -4,108 +4,12 @@ """ import re -import logging from typing import Union -logger = logging.getLogger(__name__) +from .logger_util import debug_print -def debug_print(message, prefix="[UNIT_PARSER]"): - """调试输出""" - print(f"{prefix} {message}", flush=True) - logger.info(f"{prefix} {message}") -def parse_time_with_units(time_input: Union[str, float, int], default_unit: str = "s") -> float: - """ - 解析带单位的时间输入 - - Args: - time_input: 时间输入(如 "30 min", "1 h", "300", "?", 60.0) - default_unit: 默认单位(默认为秒) - - Returns: - float: 时间(秒) - """ - if not time_input: - return 0.0 - - # 处理数值输入 - if isinstance(time_input, (int, float)): - result = float(time_input) - debug_print(f"数值时间输入: {time_input} → {result}s(默认单位)") - return result - - # 处理字符串输入 - time_str = str(time_input).lower().strip() - debug_print(f"解析时间字符串: '{time_str}'") - - # 处理特殊值 - if time_str in ['?', 'unknown', 'tbd', 'to be determined']: - default_time = 300.0 # 5分钟默认值 - debug_print(f"检测到未知时间,使用默认值: {default_time}s") - return default_time - - # 如果是纯数字,使用默认单位 - try: - value = float(time_str) - if default_unit == "s": - result = value - elif default_unit in ["min", "minute"]: - result = value * 60.0 - elif default_unit in ["h", "hour"]: - result = value * 3600.0 - else: - result = value # 默认秒 - debug_print(f"纯数字输入: {time_str} → {result}s(单位: {default_unit})") - return result - except ValueError: - pass - - # 使用正则表达式匹配数字和单位 - pattern = r'(\d+\.?\d*)\s*([a-z]*)' - match = re.match(pattern, time_str) - - if not match: - debug_print(f"⚠️ 无法解析时间: '{time_str}',使用默认值: 60s") - return 60.0 - - value = float(match.group(1)) - unit = match.group(2) or default_unit - - # 单位转换映射 - unit_multipliers = { - # 秒 - 's': 1.0, - 'sec': 1.0, - 'second': 1.0, - 'seconds': 1.0, - - # 分钟 - 'm': 60.0, - 'min': 60.0, - 'mins': 60.0, - 'minute': 60.0, - 'minutes': 60.0, - - # 小时 - 'h': 3600.0, - 'hr': 3600.0, - 'hrs': 3600.0, - 'hour': 3600.0, - 'hours': 3600.0, - - # 天 - 'd': 86400.0, - 'day': 86400.0, - 'days': 86400.0, - } - - multiplier = unit_multipliers.get(unit, 1.0) - result = value * multiplier - - debug_print(f"时间解析: '{time_str}' → {value} {unit} → {result}s") - return result - -def parse_volume_with_units(volume_input: Union[str, float, int], default_unit: str = "mL") -> float: +def parse_volume_input(volume_input: Union[str, float, int], default_unit: str = "mL") -> float: """ 解析带单位的体积输入 @@ -175,6 +79,111 @@ def parse_volume_with_units(volume_input: Union[str, float, int], default_unit: debug_print(f"体积解析: '{volume_str}' → {value} {unit} → {volume}mL") return volume + +def parse_mass_input(mass_input: Union[str, float]) -> float: + """ + 解析质量输入,支持带单位的字符串 + + Args: + mass_input: 质量输入(如 "19.3 g", "4.5 g", 2.5) + + Returns: + float: 质量(克) + """ + if isinstance(mass_input, (int, float)): + debug_print(f"⚖️ 质量输入为数值: {mass_input}g") + return float(mass_input) + + if not mass_input or not str(mass_input).strip(): + debug_print(f"⚠️ 质量输入为空,返回0.0g") + return 0.0 + + mass_str = str(mass_input).lower().strip() + debug_print(f"🔍 解析质量输入: '{mass_str}'") + + # 移除空格并提取数字和单位 + mass_clean = re.sub(r'\s+', '', mass_str) + + # 匹配数字和单位的正则表达式 + match = re.match(r'([0-9]*\.?[0-9]+)\s*(g|mg|kg|gram|milligram|kilogram)?', mass_clean) + + if not match: + debug_print(f"❌ 无法解析质量: '{mass_str}',返回0.0g") + return 0.0 + + value = float(match.group(1)) + unit = match.group(2) or 'g' # 默认单位为克 + + # 转换为克 + if unit in ['mg', 'milligram']: + mass = value / 1000.0 # mg -> g + debug_print(f"🔄 质量转换: {value}mg → {mass}g") + elif unit in ['kg', 'kilogram']: + mass = value * 1000.0 # kg -> g + debug_print(f"🔄 质量转换: {value}kg → {mass}g") + else: # g, gram 或默认 + mass = value # 已经是g + debug_print(f"✅ 质量已为g: {mass}g") + + return mass + + +def parse_time_input(time_input: Union[str, float]) -> float: + """ + 解析时间输入,支持带单位的字符串 + + Args: + time_input: 时间输入(如 "1 h", "20 min", "30 s", 60.0) + + Returns: + float: 时间(秒) + """ + if isinstance(time_input, (int, float)): + debug_print(f"⏱️ 时间输入为数值: {time_input}秒") + return float(time_input) + + if not time_input or not str(time_input).strip(): + debug_print(f"⚠️ 时间输入为空,返回0秒") + return 0.0 + + time_str = str(time_input).lower().strip() + debug_print(f"🔍 解析时间输入: '{time_str}'") + + # 处理未知时间 + if time_str in ['?', 'unknown', 'tbd']: + default_time = 60.0 # 默认1分钟 + debug_print(f"❓ 检测到未知时间,使用默认值: {default_time}s (1分钟) ⏰") + return default_time + + # 移除空格并提取数字和单位 + time_clean = re.sub(r'\s+', '', time_str) + + # 匹配数字和单位的正则表达式 + match = re.match(r'([0-9]*\.?[0-9]+)\s*(s|sec|second|min|minute|h|hr|hour|d|day)?', time_clean) + + if not match: + debug_print(f"❌ 无法解析时间: '{time_str}',返回0s") + return 0.0 + + value = float(match.group(1)) + unit = match.group(2) or 's' # 默认单位为秒 + + # 转换为秒 + if unit in ['m', 'min', 'minute', 'mins', 'minutes']: + time_sec = value * 60.0 # min -> s + debug_print(f"🔄 时间转换: {value}分钟 → {time_sec}秒") + elif unit in ['h', 'hr', 'hour', 'hrs', 'hours']: + time_sec = value * 3600.0 # h -> s + debug_print(f"🔄 时间转换: {value}小时 → {time_sec}秒") + elif unit in ['d', 'day', 'days']: + time_sec = value * 86400.0 # d -> s + debug_print(f"🔄 时间转换: {value}天 → {time_sec}秒") + else: # s, sec, second 或默认 + time_sec = value # 已经是s + debug_print(f"✅ 时间已为秒: {time_sec}秒") + + return time_sec + # 测试函数 def test_unit_parser(): """测试单位解析功能""" @@ -187,7 +196,7 @@ def test_unit_parser(): print("\n时间解析测试:") for time_input in time_tests: - result = parse_time_with_units(time_input) + result = parse_time_input(time_input) print(f" {time_input} → {result}s ({result/60:.1f}min)") # 测试体积解析 @@ -197,7 +206,7 @@ def test_unit_parser(): print("\n体积解析测试:") for volume_input in volume_tests: - result = parse_volume_with_units(volume_input) + result = parse_volume_input(volume_input) print(f" {volume_input} → {result}mL") print("\n✅ 测试完成") diff --git a/unilabos/compile/utils/vessel_parser.py b/unilabos/compile/utils/vessel_parser.py index 01275816..eb9218aa 100644 --- a/unilabos/compile/utils/vessel_parser.py +++ b/unilabos/compile/utils/vessel_parser.py @@ -1,3 +1,8 @@ +import networkx as nx + +from .logger_util import debug_print + + def get_vessel(vessel): """ 统一处理vessel参数,返回vessel_id和vessel_data。 @@ -18,3 +23,258 @@ def get_vessel(vessel): vessel_id = str(vessel) vessel_data = {} return vessel_id, vessel_data + + +def find_reagent_vessel(G: nx.DiGraph, reagent: str) -> str: + """增强版试剂容器查找,支持固体和液体""" + debug_print(f"🔍 开始查找试剂 '{reagent}' 的容器...") + + # 🔧 方法1:直接搜索 data.reagent_name 和 config.reagent + debug_print(f"📋 方法1: 搜索reagent字段...") + for node in G.nodes(): + node_data = G.nodes[node].get('data', {}) + node_type = G.nodes[node].get('type', '') + config_data = G.nodes[node].get('config', {}) + + # 只搜索容器类型的节点 + if node_type == 'container': + reagent_name = node_data.get('reagent_name', '').lower() + config_reagent = config_data.get('reagent', '').lower() + + # 精确匹配 + if reagent_name == reagent.lower() or config_reagent == reagent.lower(): + debug_print(f"✅ 通过reagent字段精确匹配到容器: {node} 🎯") + return node + + # 模糊匹配 + if (reagent.lower() in reagent_name and reagent_name) or \ + (reagent.lower() in config_reagent and config_reagent): + debug_print(f"✅ 通过reagent字段模糊匹配到容器: {node} 🔍") + return node + + # 🔧 方法2:常见的容器命名规则 + debug_print(f"📋 方法2: 使用命名规则查找...") + reagent_clean = reagent.lower().replace(' ', '_').replace('-', '_') + possible_names = [ + reagent_clean, + f"flask_{reagent_clean}", + f"bottle_{reagent_clean}", + f"vessel_{reagent_clean}", + f"{reagent_clean}_flask", + f"{reagent_clean}_bottle", + f"reagent_{reagent_clean}", + f"reagent_bottle_{reagent_clean}", + f"solid_reagent_bottle_{reagent_clean}", + f"reagent_bottle_1", # 通用试剂瓶 + f"reagent_bottle_2", + f"reagent_bottle_3" + ] + + debug_print(f"🔍 尝试的容器名称: {possible_names[:5]}... (共{len(possible_names)}个)") + + for name in possible_names: + if name in G.nodes(): + node_type = G.nodes[name].get('type', '') + if node_type == 'container': + debug_print(f"✅ 通过命名规则找到容器: {name} 📝") + return name + + # 🔧 方法3:节点名称模糊匹配 + debug_print(f"📋 方法3: 节点名称模糊匹配...") + for node_id in G.nodes(): + node_data = G.nodes[node_id] + if node_data.get('type') == 'container': + # 检查节点名称是否包含试剂名称 + if reagent_clean in node_id.lower(): + debug_print(f"✅ 通过节点名称模糊匹配到容器: {node_id} 🔍") + return node_id + + # 检查液体类型匹配 + vessel_data = node_data.get('data', {}) + liquids = vessel_data.get('liquid', []) + for liquid in liquids: + if isinstance(liquid, dict): + liquid_type = liquid.get('liquid_type') or liquid.get('name', '') + if liquid_type.lower() == reagent.lower(): + debug_print(f"✅ 通过液体类型匹配到容器: {node_id} 💧") + return node_id + + # 🔧 方法4:使用第一个试剂瓶作为备选 + debug_print(f"📋 方法4: 查找备选试剂瓶...") + for node_id in G.nodes(): + node_data = G.nodes[node_id] + if (node_data.get('type') == 'container' and + ('reagent' in node_id.lower() or 'bottle' in node_id.lower())): + debug_print(f"⚠️ 未找到专用容器,使用备选试剂瓶: {node_id} 🔄") + return node_id + + debug_print(f"❌ 所有方法都失败了,无法找到容器!") + raise ValueError(f"找不到试剂 '{reagent}' 对应的容器") + + +def find_solvent_vessel(G: nx.DiGraph, solvent: str) -> str: + """ + 查找溶剂容器 + + Args: + G: 网络图 + solvent: 溶剂名称 + + Returns: + str: 溶剂容器ID + """ + debug_print(f"🔍 正在查找溶剂 '{solvent}' 的容器... 🧪") + + # 构建可能的容器名称 + possible_names = [ + f"flask_{solvent}", + f"bottle_{solvent}", + f"reagent_{solvent}", + f"reagent_bottle_{solvent}", + f"{solvent}_flask", + f"{solvent}_bottle", + f"{solvent}", + f"vessel_{solvent}", + ] + + debug_print(f"📋 候选容器名称: {possible_names[:3]}... (共{len(possible_names)}个) 📝") + + # 第一步:通过容器名称匹配 + debug_print(" 🎯 步骤1: 精确名称匹配...") + for vessel_name in possible_names: + if vessel_name in G.nodes(): + debug_print(f" 🎉 通过名称匹配找到容器: {vessel_name} ✨") + return vessel_name + + # 第二步:通过模糊匹配(节点ID和名称) + debug_print(" 🔍 步骤2: 模糊名称匹配...") + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + node_name = G.nodes[node_id].get('name', '').lower() + + if solvent.lower() in node_id.lower() or solvent.lower() in node_name: + debug_print(f" 🎉 通过模糊匹配找到容器: {node_id} (名称: {node_name}) ✨") + return node_id + + # 第三步:通过配置中的试剂信息匹配 + debug_print(" 🧪 步骤3: 配置试剂信息匹配...") + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + # 检查 config 中的 reagent 字段 + node_config = G.nodes[node_id].get('config', {}) + config_reagent = node_config.get('reagent', '').lower() + + if config_reagent and solvent.lower() == config_reagent: + debug_print(f" 🎉 通过config.reagent匹配找到容器: {node_id} (试剂: {config_reagent}) ✨") + return node_id + + # 第四步:通过数据中的试剂信息匹配 + debug_print(" 🧪 步骤4: 数据试剂信息匹配...") + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + vessel_data = G.nodes[node_id].get('data', {}) + + # 检查 data 中的 reagent_name 字段 + reagent_name = vessel_data.get('reagent_name', '').lower() + if reagent_name and solvent.lower() == reagent_name: + debug_print(f" 🎉 通过data.reagent_name匹配找到容器: {node_id} (试剂: {reagent_name}) ✨") + return node_id + + # 检查 data 中的液体信息 + liquids = vessel_data.get('liquid', []) + for liquid in liquids: + if isinstance(liquid, dict): + liquid_type = (liquid.get('liquid_type') or liquid.get('name', '')).lower() + + if solvent.lower() in liquid_type: + debug_print(f" 🎉 通过液体类型匹配找到容器: {node_id} (液体类型: {liquid_type}) ✨") + return node_id + + # 第五步:部分匹配(如果前面都没找到) + debug_print(" 🔍 步骤5: 部分匹配...") + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + node_config = G.nodes[node_id].get('config', {}) + node_data = G.nodes[node_id].get('data', {}) + node_name = G.nodes[node_id].get('name', '').lower() + + config_reagent = node_config.get('reagent', '').lower() + data_reagent = node_data.get('reagent_name', '').lower() + + # 检查是否包含溶剂名称 + if (solvent.lower() in config_reagent or + solvent.lower() in data_reagent or + solvent.lower() in node_name or + solvent.lower() in node_id.lower()): + debug_print(f" 🎉 通过部分匹配找到容器: {node_id} ✨") + debug_print(f" - 节点名称: {node_name}") + debug_print(f" - 配置试剂: {config_reagent}") + debug_print(f" - 数据试剂: {data_reagent}") + return node_id + + # 调试信息:列出所有容器 + debug_print(" 🔎 调试信息:列出所有容器...") + container_list = [] + for node_id in G.nodes(): + if G.nodes[node_id].get('type') == 'container': + node_config = G.nodes[node_id].get('config', {}) + node_data = G.nodes[node_id].get('data', {}) + node_name = G.nodes[node_id].get('name', '') + + container_info = { + 'id': node_id, + 'name': node_name, + 'config_reagent': node_config.get('reagent', ''), + 'data_reagent': node_data.get('reagent_name', '') + } + container_list.append(container_info) + debug_print( + f" - 容器: {node_id}, 名称: {node_name}, config试剂: {node_config.get('reagent', '')}, data试剂: {node_data.get('reagent_name', '')}") + + debug_print(f"❌ 找不到溶剂 '{solvent}' 对应的容器 😭") + debug_print(f"🔍 查找的溶剂: '{solvent}' (小写: '{solvent.lower()}')") + debug_print(f"📊 总共发现 {len(container_list)} 个容器") + + raise ValueError(f"找不到溶剂 '{solvent}' 对应的容器") + + +def find_connected_stirrer(G: nx.DiGraph, vessel: str) -> str: + """查找连接到指定容器的搅拌器""" + debug_print(f"🔍 查找连接到容器 '{vessel}' 的搅拌器...") + + stirrer_nodes = [] + for node in G.nodes(): + node_class = G.nodes[node].get('class', '').lower() + if 'stirrer' in node_class: + stirrer_nodes.append(node) + debug_print(f"📋 发现搅拌器: {node}") + + debug_print(f"📊 共找到 {len(stirrer_nodes)} 个搅拌器") + + # 查找连接到容器的搅拌器 + for stirrer in stirrer_nodes: + if G.has_edge(stirrer, vessel) or G.has_edge(vessel, stirrer): + debug_print(f"✅ 找到连接的搅拌器: {stirrer} 🔗") + return stirrer + + # 返回第一个搅拌器 + if stirrer_nodes: + debug_print(f"⚠️ 未找到直接连接的搅拌器,使用第一个: {stirrer_nodes[0]} 🔄") + return stirrer_nodes[0] + + debug_print(f"❌ 未找到任何搅拌器") + return "" + + +def find_solid_dispenser(G: nx.DiGraph) -> str: + """查找固体加样器""" + debug_print(f"🔍 查找固体加样器...") + + for node in G.nodes(): + node_class = G.nodes[node].get('class', '').lower() + if 'solid_dispenser' in node_class or 'dispenser' in node_class: + debug_print(f"✅ 找到固体加样器: {node} 🥄") + return node + + debug_print(f"❌ 未找到固体加样器") + return "" \ No newline at end of file diff --git a/unilabos/compile/wash_solid_protocol.py b/unilabos/compile/wash_solid_protocol.py index b167c85e..a295d6ee 100644 --- a/unilabos/compile/wash_solid_protocol.py +++ b/unilabos/compile/wash_solid_protocol.py @@ -3,118 +3,14 @@ import networkx as nx import logging import re +from .utils.unit_parser import parse_time_input, parse_volume_input + logger = logging.getLogger(__name__) def debug_print(message): """调试输出""" - print(f"🧼 [WASH_SOLID] {message}", flush=True) logger.info(f"[WASH_SOLID] {message}") -def parse_time_input(time_input: Union[str, float, int]) -> float: - """统一时间解析函数(精简版)""" - if not time_input: - return 0.0 - - # 🔢 处理数值输入 - if isinstance(time_input, (int, float)): - result = float(time_input) - debug_print(f"⏰ 数值时间: {time_input} → {result}s") - return result - - # 📝 处理字符串输入 - time_str = str(time_input).lower().strip() - - # ❓ 特殊值快速处理 - special_times = { - '?': 60.0, 'unknown': 60.0, 'briefly': 30.0, - 'quickly': 45.0, 'slowly': 120.0 - } - - if time_str in special_times: - result = special_times[time_str] - debug_print(f"🎯 特殊时间: '{time_str}' → {result}s") - return result - - # 🔢 数字提取(简化正则) - try: - # 提取数字 - numbers = re.findall(r'\d+\.?\d*', time_str) - if numbers: - value = float(numbers[0]) - - # 简化单位判断 - if any(unit in time_str for unit in ['min', 'm']): - result = value * 60.0 - elif any(unit in time_str for unit in ['h', 'hour']): - result = value * 3600.0 - else: - result = value # 默认秒 - - debug_print(f"✅ 时间解析: '{time_str}' → {result}s") - return result - except: - pass - - debug_print(f"⚠️ 时间解析失败: '{time_str}',使用默认60s") - return 60.0 - -def parse_volume_input(volume: Union[float, str], volume_spec: str = "", mass: str = "") -> float: - """统一体积解析函数(精简版)""" - debug_print(f"💧 解析体积: volume={volume}, spec='{volume_spec}', mass='{mass}'") - - # 🎯 优先级1:volume_spec(快速映射) - if volume_spec: - spec_map = { - 'small': 20.0, 'medium': 50.0, 'large': 100.0, - 'minimal': 10.0, 'normal': 50.0, 'generous': 150.0 - } - for key, val in spec_map.items(): - if key in volume_spec.lower(): - debug_print(f"🎯 规格匹配: '{volume_spec}' → {val}mL") - return val - - # 🧮 优先级2:mass转体积(简化:1g=1mL) - if mass: - try: - numbers = re.findall(r'\d+\.?\d*', mass) - if numbers: - value = float(numbers[0]) - if 'mg' in mass.lower(): - result = value / 1000.0 - elif 'kg' in mass.lower(): - result = value * 1000.0 - else: - result = value # 默认g - debug_print(f"⚖️ 质量转换: {mass} → {result}mL") - return result - except: - pass - - # 📦 优先级3:volume - if volume: - if isinstance(volume, (int, float)): - result = float(volume) - debug_print(f"💧 数值体积: {volume} → {result}mL") - return result - elif isinstance(volume, str): - try: - # 提取数字 - numbers = re.findall(r'\d+\.?\d*', volume) - if numbers: - value = float(numbers[0]) - # 简化单位判断 - if 'l' in volume.lower() and 'ml' not in volume.lower(): - result = value * 1000.0 # L转mL - else: - result = value # 默认mL - debug_print(f"💧 字符串体积: '{volume}' → {result}mL") - return result - except: - pass - - # 默认值 - debug_print(f"⚠️ 体积解析失败,使用默认50mL") - return 50.0 def find_solvent_source(G: nx.DiGraph, solvent: str) -> str: """查找溶剂源(精简版)""" diff --git a/unilabos/devices/laiyu_add_solid/__init__.py b/unilabos/devices/laiyu_add_solid/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/unilabos/devices/laiyu_add_solid/laiyu.py b/unilabos/devices/powder_dispense/laiyu.py similarity index 100% rename from unilabos/devices/laiyu_add_solid/laiyu.py rename to unilabos/devices/powder_dispense/laiyu.py diff --git a/unilabos/devices/ChinWe/chinwe.py b/unilabos/devices/separator/chinwe.py similarity index 100% rename from unilabos/devices/ChinWe/chinwe.py rename to unilabos/devices/separator/chinwe.py diff --git a/unilabos/devices/virtual/virtual_heatchill.py b/unilabos/devices/virtual/virtual_heatchill.py index 94ab5720..71023320 100644 --- a/unilabos/devices/virtual/virtual_heatchill.py +++ b/unilabos/devices/virtual/virtual_heatchill.py @@ -67,7 +67,7 @@ class VirtualHeatChill: self.logger.info(f"✅ 温控设备 {self.device_id} 清理完成 💤") return True - async def heat_chill(self, vessel: str, temp: float, time, stir: bool, + async def heat_chill(self, temp: float, time, stir: bool, stir_speed: float, purpose: str) -> bool: """Execute heat chill action - 🔧 修复:确保参数类型正确""" @@ -77,7 +77,6 @@ class VirtualHeatChill: time_value = float(time) # 强制转换为浮点数 stir_speed = float(stir_speed) stir = bool(stir) - vessel = str(vessel) purpose = str(purpose) except (ValueError, TypeError) as e: error_msg = f"参数类型转换错误: temp={temp}({type(temp)}), time={time}({type(time)}), error={str(e)}" @@ -102,8 +101,7 @@ class VirtualHeatChill: operation_mode = "Maintaining" status_action = "保温" - self.logger.info(f"🌡️ 开始温控操作: {vessel} → {temp}°C {temp_emoji}") - self.logger.info(f" 🥽 容器: {vessel}") + self.logger.info(f"🌡️ 开始温控操作: {temp}°C {temp_emoji}") self.logger.info(f" 🎯 目标温度: {temp}°C {temp_emoji}") self.logger.info(f" ⏰ 持续时间: {time_value}s") self.logger.info(f" 🌪️ 搅拌: {stir} ({stir_speed} RPM)") @@ -147,7 +145,7 @@ class VirtualHeatChill: stir_info = f" | 🌪️ 搅拌: {stir_speed} RPM" if stir else "" self.data.update({ - "status": f"{temp_emoji} 运行中: {status_action} {vessel} 至 {temp}°C | ⏰ 剩余: {total_time:.0f}s{stir_info}", + "status": f"{temp_emoji} 运行中: {status_action} 至 {temp}°C | ⏰ 剩余: {total_time:.0f}s{stir_info}", "operation_mode": operation_mode, "is_stirring": stir, "stir_speed": stir_speed if stir else 0.0, @@ -165,7 +163,7 @@ class VirtualHeatChill: # 更新剩余时间和状态 self.data.update({ "remaining_time": remaining, - "status": f"{temp_emoji} 运行中: {status_action} {vessel} 至 {temp}°C | ⏰ 剩余: {remaining:.0f}s{stir_info}", + "status": f"{temp_emoji} 运行中: {status_action} 至 {temp}°C | ⏰ 剩余: {remaining:.0f}s{stir_info}", "progress": progress }) @@ -185,7 +183,7 @@ class VirtualHeatChill: final_stir_info = f" | 🌪️ 搅拌: {stir_speed} RPM" if stir else "" self.data.update({ - "status": f"✅ 完成: {vessel} 已达到 {temp}°C {temp_emoji} | ⏱️ 用时: {total_time:.0f}s{final_stir_info}", + "status": f"✅ 完成: 已达到 {temp}°C {temp_emoji} | ⏱️ 用时: {total_time:.0f}s{final_stir_info}", "operation_mode": "Completed", "remaining_time": 0.0, "is_stirring": False, @@ -195,7 +193,6 @@ class VirtualHeatChill: self.logger.info(f"🎉 温控操作完成! ✨") self.logger.info(f"📊 操作结果:") - self.logger.info(f" 🥽 容器: {vessel}") self.logger.info(f" 🌡️ 达到温度: {temp}°C {temp_emoji}") self.logger.info(f" ⏱️ 总用时: {total_time:.0f}s") if stir: @@ -204,13 +201,12 @@ class VirtualHeatChill: return True - async def heat_chill_start(self, vessel: str, temp: float, purpose: str) -> bool: + async def heat_chill_start(self, temp: float, purpose: str) -> bool: """Start continuous heat chill 🔄""" # 🔧 添加类型转换 try: temp = float(temp) - vessel = str(vessel) purpose = str(purpose) except (ValueError, TypeError) as e: error_msg = f"参数类型转换错误: {str(e)}" @@ -235,8 +231,7 @@ class VirtualHeatChill: operation_mode = "Maintaining" status_action = "恒温保持" - self.logger.info(f"🔄 启动持续温控: {vessel} → {temp}°C {temp_emoji}") - self.logger.info(f" 🥽 容器: {vessel}") + self.logger.info(f"🔄 启动持续温控: {temp}°C {temp_emoji}") self.logger.info(f" 🎯 目标温度: {temp}°C {temp_emoji}") self.logger.info(f" 🔄 模式: {status_action}") self.logger.info(f" 📝 目的: {purpose}") @@ -252,7 +247,7 @@ class VirtualHeatChill: return False self.data.update({ - "status": f"🔄 启动: {status_action} {vessel} 至 {temp}°C {temp_emoji} | ♾️ 持续运行", + "status": f"🔄 启动: {status_action} 至 {temp}°C {temp_emoji} | ♾️ 持续运行", "operation_mode": operation_mode, "is_stirring": False, "stir_speed": 0.0, @@ -262,28 +257,20 @@ class VirtualHeatChill: self.logger.info(f"✅ 持续温控已启动! {temp_emoji} {status_action}模式 🚀") return True - async def heat_chill_stop(self, vessel: str) -> bool: + async def heat_chill_stop(self) -> bool: """Stop heat chill 🛑""" - # 🔧 添加类型转换 - try: - vessel = str(vessel) - except (ValueError, TypeError) as e: - error_msg = f"参数类型转换错误: {str(e)}" - self.logger.error(f"❌ {error_msg}") - return False - - self.logger.info(f"🛑 停止温控: {vessel}") + self.logger.info(f"🛑 停止温控:") self.data.update({ - "status": f"🛑 已停止: {vessel} 温控停止", + "status": f"🛑 {self.device_id} 温控停止", "operation_mode": "Stopped", "is_stirring": False, "stir_speed": 0.0, "remaining_time": 0.0, }) - self.logger.info(f"✅ 温控设备已停止 {vessel} 的温度控制 🏁") + self.logger.info(f"✅ 温控设备已停止 {self.device_id} 温度控制 🏁") return True # 状态属性 diff --git a/unilabos/devices/virtual/virtual_pump.py b/unilabos/devices/virtual/virtual_pump.py deleted file mode 100644 index d134319a..00000000 --- a/unilabos/devices/virtual/virtual_pump.py +++ /dev/null @@ -1,197 +0,0 @@ -import asyncio -import logging -from typing import Dict, Any, Optional - -class VirtualPump: - """Virtual pump device for transfer and cleaning operations""" - - def __init__(self, device_id: str = None, config: Dict[str, Any] = None, **kwargs): - # 处理可能的不同调用方式 - if device_id is None and 'id' in kwargs: - device_id = kwargs.pop('id') - if config is None and 'config' in kwargs: - config = kwargs.pop('config') - - # 设置默认值 - self.device_id = device_id or "unknown_pump" - self.config = config or {} - - self.logger = logging.getLogger(f"VirtualPump.{self.device_id}") - self.data = {} - - # 从config或kwargs中获取配置参数 - self.port = self.config.get('port') or kwargs.get('port', 'VIRTUAL') - self._max_volume = self.config.get('max_volume') or kwargs.get('max_volume', 50.0) - self._transfer_rate = self.config.get('transfer_rate') or kwargs.get('transfer_rate', 10.0) - - print(f"=== VirtualPump {self.device_id} created with max_volume={self._max_volume}, transfer_rate={self._transfer_rate} ===") - - async def initialize(self) -> bool: - """Initialize virtual pump""" - self.logger.info(f"Initializing virtual pump {self.device_id}") - self.data.update({ - "status": "Idle", - "valve_position": 0, - "current_volume": 0.0, - "max_volume": self._max_volume, - "transfer_rate": self._transfer_rate, - "from_vessel": "", - "to_vessel": "", - "progress": 0.0, - "transferred_volume": 0.0, - "current_status": "Ready" - }) - return True - - async def cleanup(self) -> bool: - """Cleanup virtual pump""" - self.logger.info(f"Cleaning up virtual pump {self.device_id}") - return True - - async def transfer(self, from_vessel: str, to_vessel: str, volume: float, - amount: str = "", time: float = 0.0, viscous: bool = False, - rinsing_solvent: str = "", rinsing_volume: float = 0.0, - rinsing_repeats: int = 0, solid: bool = False) -> bool: - """Execute transfer operation""" - self.logger.info(f"Transferring {volume}mL from {from_vessel} to {to_vessel}") - - # 计算转移时间 - transfer_time = volume / self._transfer_rate if time == 0 else time - - self.data.update({ - "status": "Running", - "from_vessel": from_vessel, - "to_vessel": to_vessel, - "current_status": "Transferring", - "progress": 0.0, - "transferred_volume": 0.0 - }) - - # 模拟转移过程 - steps = 10 - step_time = transfer_time / steps - step_volume = volume / steps - - for i in range(steps): - await asyncio.sleep(step_time) - progress = (i + 1) / steps * 100 - current_volume = step_volume * (i + 1) - - self.data.update({ - "progress": progress, - "transferred_volume": current_volume, - "current_status": f"Transferring: {progress:.1f}%" - }) - - self.logger.info(f"Transfer progress: {progress:.1f}%") - - self.data.update({ - "status": "Idle", - "current_status": "Transfer completed", - "progress": 100.0, - "transferred_volume": volume - }) - - return True - - async def clean_vessel(self, vessel: str, solvent: str, volume: float, - temp: float, repeats: int = 1) -> bool: - """Execute vessel cleaning operation - matches CleanVessel action""" - self.logger.info(f"Starting vessel cleaning: {vessel} with {solvent} ({volume}mL at {temp}°C, {repeats} repeats)") - - # 更新设备状态 - self.data.update({ - "status": "Running", - "from_vessel": f"flask_{solvent}", - "to_vessel": vessel, - "current_status": "Cleaning in progress", - "progress": 0.0, - "transferred_volume": 0.0 - }) - - # 计算清洗时间(基于体积和重复次数) - # 假设清洗速度为 transfer_rate 的一半(因为需要加载和排放) - cleaning_rate = self._transfer_rate / 2 - cleaning_time_per_cycle = volume / cleaning_rate - total_cleaning_time = cleaning_time_per_cycle * repeats - - # 模拟清洗过程 - steps_per_repeat = 10 # 每次重复清洗分10个步骤 - total_steps = steps_per_repeat * repeats - step_time = total_cleaning_time / total_steps - - for repeat in range(repeats): - self.logger.info(f"Starting cleaning cycle {repeat + 1}/{repeats}") - - for step in range(steps_per_repeat): - await asyncio.sleep(step_time) - - # 计算当前进度 - current_step = repeat * steps_per_repeat + step + 1 - progress = (current_step / total_steps) * 100 - - # 计算已处理的体积 - volume_processed = (current_step / total_steps) * volume * repeats - - # 更新状态 - self.data.update({ - "progress": progress, - "transferred_volume": volume_processed, - "current_status": f"Cleaning cycle {repeat + 1}/{repeats} - Step {step + 1}/{steps_per_repeat} ({progress:.1f}%)" - }) - - self.logger.info(f"Cleaning progress: {progress:.1f}% (Cycle {repeat + 1}/{repeats})") - - # 清洗完成 - self.data.update({ - "status": "Idle", - "current_status": "Cleaning completed successfully", - "progress": 100.0, - "transferred_volume": volume * repeats, - "from_vessel": "", - "to_vessel": "" - }) - - self.logger.info(f"Vessel cleaning completed: {vessel}") - return True - - # 状态属性 - @property - def status(self) -> str: - return self.data.get("status", "Unknown") - - @property - def valve_position(self) -> int: - return self.data.get("valve_position", 0) - - @property - def current_volume(self) -> float: - return self.data.get("current_volume", 0.0) - - @property - def max_volume(self) -> float: - return self.data.get("max_volume", 0.0) - - @property - def transfer_rate(self) -> float: - return self.data.get("transfer_rate", 0.0) - - @property - def from_vessel(self) -> str: - return self.data.get("from_vessel", "") - - @property - def to_vessel(self) -> str: - return self.data.get("to_vessel", "") - - @property - def progress(self) -> float: - return self.data.get("progress", 0.0) - - @property - def transferred_volume(self) -> float: - return self.data.get("transferred_volume", 0.0) - - @property - def current_status(self) -> str: - return self.data.get("current_status", "Ready") \ No newline at end of file diff --git a/unilabos/registry/devices/pump_and_valve.yaml b/unilabos/registry/devices/pump_and_valve.yaml index 9149e1b7..7fc3a20b 100644 --- a/unilabos/registry/devices/pump_and_valve.yaml +++ b/unilabos/registry/devices/pump_and_valve.yaml @@ -366,525 +366,6 @@ solenoid_valve.mock: - valve_position type: object version: 1.0.0 -syringe_pump_with_valve.runze.SY03B-T08: - category: - - pump_and_valve - class: - action_value_mappings: - auto-close: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: close的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: close参数 - type: object - type: UniLabJsonCommand - auto-initialize: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: initialize的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: initialize参数 - type: object - type: UniLabJsonCommand - auto-pull_plunger: - feedback: {} - goal: {} - goal_default: - volume: null - handles: [] - result: {} - schema: - description: pull_plunger的参数schema - properties: - feedback: {} - goal: - properties: - volume: - type: number - required: - - volume - type: object - result: {} - required: - - goal - title: pull_plunger参数 - type: object - type: UniLabJsonCommand - auto-push_plunger: - feedback: {} - goal: {} - goal_default: - volume: null - handles: [] - result: {} - schema: - description: push_plunger的参数schema - properties: - feedback: {} - goal: - properties: - volume: - type: number - required: - - volume - type: object - result: {} - required: - - goal - title: push_plunger参数 - type: object - type: UniLabJsonCommand - auto-query_aux_input_status_1: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: query_aux_input_status_1的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: query_aux_input_status_1参数 - type: object - type: UniLabJsonCommand - auto-query_aux_input_status_2: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: query_aux_input_status_2的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: query_aux_input_status_2参数 - type: object - type: UniLabJsonCommand - auto-query_backlash_position: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: query_backlash_position的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: query_backlash_position参数 - type: object - type: UniLabJsonCommand - auto-query_command_buffer_status: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: query_command_buffer_status的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: query_command_buffer_status参数 - type: object - type: UniLabJsonCommand - auto-query_software_version: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: query_software_version的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: query_software_version参数 - type: object - type: UniLabJsonCommand - auto-send_command: - feedback: {} - goal: {} - goal_default: - full_command: null - handles: [] - result: {} - schema: - description: send_command的参数schema - properties: - feedback: {} - goal: - properties: - full_command: - type: string - required: - - full_command - type: object - result: {} - required: - - goal - title: send_command参数 - type: object - type: UniLabJsonCommand - auto-set_baudrate: - feedback: {} - goal: {} - goal_default: - baudrate: null - handles: [] - result: {} - schema: - description: set_baudrate的参数schema - properties: - feedback: {} - goal: - properties: - baudrate: - type: string - required: - - baudrate - type: object - result: {} - required: - - goal - title: set_baudrate参数 - type: object - type: UniLabJsonCommand - auto-set_max_velocity: - feedback: {} - goal: {} - goal_default: - velocity: null - handles: [] - result: {} - schema: - description: set_max_velocity的参数schema - properties: - feedback: {} - goal: - properties: - velocity: - type: number - required: - - velocity - type: object - result: {} - required: - - goal - title: set_max_velocity参数 - type: object - type: UniLabJsonCommand - auto-set_position: - feedback: {} - goal: {} - goal_default: - max_velocity: null - position: null - handles: [] - result: {} - schema: - description: set_position的参数schema - properties: - feedback: {} - goal: - properties: - max_velocity: - type: number - position: - type: number - required: - - position - type: object - result: {} - required: - - goal - title: set_position参数 - type: object - type: UniLabJsonCommand - auto-set_valve_position: - feedback: {} - goal: {} - goal_default: - position: null - handles: [] - result: {} - schema: - description: set_valve_position的参数schema - properties: - feedback: {} - goal: - properties: - position: - type: string - required: - - position - type: object - result: {} - required: - - goal - title: set_valve_position参数 - type: object - type: UniLabJsonCommand - auto-set_velocity_grade: - feedback: {} - goal: {} - goal_default: - velocity: null - handles: [] - result: {} - schema: - description: set_velocity_grade的参数schema - properties: - feedback: {} - goal: - properties: - velocity: - type: string - required: - - velocity - type: object - result: {} - required: - - goal - title: set_velocity_grade参数 - type: object - type: UniLabJsonCommand - auto-stop_operation: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: stop_operation的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: stop_operation参数 - type: object - type: UniLabJsonCommand - auto-wait_error: - feedback: {} - goal: {} - goal_default: {} - handles: [] - result: {} - schema: - description: wait_error的参数schema - properties: - feedback: {} - goal: - properties: {} - required: [] - type: object - result: {} - required: - - goal - title: wait_error参数 - type: object - type: UniLabJsonCommand - hardware_interface: - name: hardware_interface - read: send_command - write: send_command - module: unilabos.devices.pump_and_valve.runze_backbone:RunzeSyringePump - status_types: - max_velocity: float - mode: int - plunger_position: String - position: float - status: str - valve_position: str - velocity_end: String - velocity_grade: String - velocity_init: String - type: python - config_info: [] - description: 润泽精密注射泵设备,集成阀门控制的高精度流体输送系统。该设备通过串口通信控制,支持多种运行模式和精确的体积控制。具备可变速度控制、精密定位、阀门切换、实时状态监控等功能。适用于微量液体输送、精密进样、流速控制、化学反应进料等需要高精度流体操作的实验室自动化应用。 - handles: - - data_key: fluid_port_1 - data_source: executor - data_type: fluid - description: 八通阀门端口1 - handler_key: '1' - io_type: source - label: '1' - side: NORTH - - data_key: fluid_port_2 - data_source: executor - data_type: fluid - description: 八通阀门端口2 - handler_key: '2' - io_type: source - label: '2' - side: EAST - - data_key: fluid_port_3 - data_source: executor - data_type: fluid - description: 八通阀门端口3 - handler_key: '3' - io_type: source - label: '3' - side: EAST - - data_key: fluid_port_4 - data_source: executor - data_type: fluid - description: 八通阀门端口4 - handler_key: '4' - io_type: source - label: '4' - side: SOUTH - - data_key: fluid_port_5 - data_source: executor - data_type: fluid - description: 八通阀门端口5 - handler_key: '5' - io_type: source - label: '5' - side: SOUTH - - data_key: fluid_port_6 - data_source: executor - data_type: fluid - description: 八通阀门端口6 - handler_key: '6' - io_type: source - label: '6' - side: WEST - - data_key: fluid_port_7 - data_source: executor - data_type: fluid - description: 八通阀门端口7 - handler_key: '7' - io_type: source - label: '7' - side: WEST - - data_key: fluid_port_8 - data_source: executor - data_type: fluid - description: 八通阀门端口8-特殊输入 - handler_key: '8' - io_type: target - label: '8' - side: WEST - - data_key: fluid_port_8 - data_source: executor - data_type: fluid - description: 八通阀门端口8 - handler_key: '8' - io_type: source - label: '8' - side: NORTH - init_param_schema: - config: - properties: - address: - default: '1' - type: string - max_volume: - default: 25.0 - type: number - mode: - type: string - port: - type: string - required: - - port - type: object - data: - properties: - max_velocity: - type: number - mode: - type: integer - plunger_position: - type: string - position: - type: number - status: - type: string - valve_position: - type: string - velocity_end: - type: string - velocity_grade: - type: string - velocity_init: - type: string - required: - - status - - mode - - max_velocity - - velocity_grade - - velocity_init - - velocity_end - - valve_position - - position - - plunger_position - type: object - version: 1.0.0 syringe_pump_with_valve.runze.SY03B-T06: category: - pump_and_valve @@ -1284,62 +765,583 @@ syringe_pump_with_valve.runze.SY03B-T06: config_info: [] description: 润泽精密注射泵设备,集成阀门控制的高精度流体输送系统。该设备通过串口通信控制,支持多种运行模式和精确的体积控制。具备可变速度控制、精密定位、阀门切换、实时状态监控等功能。适用于微量液体输送、精密进样、流速控制、化学反应进料等需要高精度流体操作的实验室自动化应用。 handles: - - data_key: fluid_port_1 - data_source: executor - data_type: fluid - description: 八通阀门端口1 - handler_key: '1' - io_type: source - label: '1' - side: NORTH - - data_key: fluid_port_2 - data_source: executor - data_type: fluid - description: 八通阀门端口2 - handler_key: '2' - io_type: source - label: '2' - side: EAST - - data_key: fluid_port_3 - data_source: executor - data_type: fluid - description: 八通阀门端口3 - handler_key: '3' - io_type: source - label: '3' - side: SOUTH - - data_key: fluid_port_4 - data_source: executor - data_type: fluid - description: 八通阀门端口4 - handler_key: '4' - io_type: source - label: '4' - side: SOUTH - - data_key: fluid_port_5 - data_source: executor - data_type: fluid - description: 八通阀门端口5 - handler_key: '5' - io_type: source - label: '5' - side: EAST - - data_key: fluid_port_6 - data_source: executor - data_type: fluid - description: 八通阀门端口6 - handler_key: '6' - io_type: source - label: '6' - side: NORTH - - data_key: fluid_port_6 - data_source: executor - data_type: fluid - description: 六通阀门端口6-特殊输入 - handler_key: '6' - io_type: target - label: '6-in' - side: WEST + - data_key: fluid_port_1 + data_source: executor + data_type: fluid + description: 八通阀门端口1 + handler_key: '1' + io_type: source + label: '1' + side: NORTH + - data_key: fluid_port_2 + data_source: executor + data_type: fluid + description: 八通阀门端口2 + handler_key: '2' + io_type: source + label: '2' + side: EAST + - data_key: fluid_port_3 + data_source: executor + data_type: fluid + description: 八通阀门端口3 + handler_key: '3' + io_type: source + label: '3' + side: SOUTH + - data_key: fluid_port_4 + data_source: executor + data_type: fluid + description: 八通阀门端口4 + handler_key: '4' + io_type: source + label: '4' + side: SOUTH + - data_key: fluid_port_5 + data_source: executor + data_type: fluid + description: 八通阀门端口5 + handler_key: '5' + io_type: source + label: '5' + side: EAST + - data_key: fluid_port_6 + data_source: executor + data_type: fluid + description: 八通阀门端口6 + handler_key: '6' + io_type: source + label: '6' + side: NORTH + - data_key: fluid_port_6 + data_source: executor + data_type: fluid + description: 六通阀门端口6-特殊输入 + handler_key: '6' + io_type: target + label: 6-in + side: WEST + icon: '' + init_param_schema: + config: + properties: + address: + default: '1' + type: string + max_volume: + default: 25.0 + type: number + mode: + type: string + port: + type: string + required: + - port + type: object + data: + properties: + max_velocity: + type: number + mode: + type: integer + plunger_position: + type: string + position: + type: number + status: + type: string + valve_position: + type: string + velocity_end: + type: string + velocity_grade: + type: string + velocity_init: + type: string + required: + - status + - mode + - max_velocity + - velocity_grade + - velocity_init + - velocity_end + - valve_position + - position + - plunger_position + type: object + version: 1.0.0 +syringe_pump_with_valve.runze.SY03B-T08: + category: + - pump_and_valve + class: + action_value_mappings: + auto-close: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: close的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: close参数 + type: object + type: UniLabJsonCommand + auto-initialize: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: initialize的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: initialize参数 + type: object + type: UniLabJsonCommand + auto-pull_plunger: + feedback: {} + goal: {} + goal_default: + volume: null + handles: [] + result: {} + schema: + description: pull_plunger的参数schema + properties: + feedback: {} + goal: + properties: + volume: + type: number + required: + - volume + type: object + result: {} + required: + - goal + title: pull_plunger参数 + type: object + type: UniLabJsonCommand + auto-push_plunger: + feedback: {} + goal: {} + goal_default: + volume: null + handles: [] + result: {} + schema: + description: push_plunger的参数schema + properties: + feedback: {} + goal: + properties: + volume: + type: number + required: + - volume + type: object + result: {} + required: + - goal + title: push_plunger参数 + type: object + type: UniLabJsonCommand + auto-query_aux_input_status_1: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: query_aux_input_status_1的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: query_aux_input_status_1参数 + type: object + type: UniLabJsonCommand + auto-query_aux_input_status_2: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: query_aux_input_status_2的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: query_aux_input_status_2参数 + type: object + type: UniLabJsonCommand + auto-query_backlash_position: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: query_backlash_position的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: query_backlash_position参数 + type: object + type: UniLabJsonCommand + auto-query_command_buffer_status: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: query_command_buffer_status的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: query_command_buffer_status参数 + type: object + type: UniLabJsonCommand + auto-query_software_version: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: query_software_version的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: query_software_version参数 + type: object + type: UniLabJsonCommand + auto-send_command: + feedback: {} + goal: {} + goal_default: + full_command: null + handles: [] + result: {} + schema: + description: send_command的参数schema + properties: + feedback: {} + goal: + properties: + full_command: + type: string + required: + - full_command + type: object + result: {} + required: + - goal + title: send_command参数 + type: object + type: UniLabJsonCommand + auto-set_baudrate: + feedback: {} + goal: {} + goal_default: + baudrate: null + handles: [] + result: {} + schema: + description: set_baudrate的参数schema + properties: + feedback: {} + goal: + properties: + baudrate: + type: string + required: + - baudrate + type: object + result: {} + required: + - goal + title: set_baudrate参数 + type: object + type: UniLabJsonCommand + auto-set_max_velocity: + feedback: {} + goal: {} + goal_default: + velocity: null + handles: [] + result: {} + schema: + description: set_max_velocity的参数schema + properties: + feedback: {} + goal: + properties: + velocity: + type: number + required: + - velocity + type: object + result: {} + required: + - goal + title: set_max_velocity参数 + type: object + type: UniLabJsonCommand + auto-set_position: + feedback: {} + goal: {} + goal_default: + max_velocity: null + position: null + handles: [] + result: {} + schema: + description: set_position的参数schema + properties: + feedback: {} + goal: + properties: + max_velocity: + type: number + position: + type: number + required: + - position + type: object + result: {} + required: + - goal + title: set_position参数 + type: object + type: UniLabJsonCommand + auto-set_valve_position: + feedback: {} + goal: {} + goal_default: + position: null + handles: [] + result: {} + schema: + description: set_valve_position的参数schema + properties: + feedback: {} + goal: + properties: + position: + type: string + required: + - position + type: object + result: {} + required: + - goal + title: set_valve_position参数 + type: object + type: UniLabJsonCommand + auto-set_velocity_grade: + feedback: {} + goal: {} + goal_default: + velocity: null + handles: [] + result: {} + schema: + description: set_velocity_grade的参数schema + properties: + feedback: {} + goal: + properties: + velocity: + type: string + required: + - velocity + type: object + result: {} + required: + - goal + title: set_velocity_grade参数 + type: object + type: UniLabJsonCommand + auto-stop_operation: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: stop_operation的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: stop_operation参数 + type: object + type: UniLabJsonCommand + auto-wait_error: + feedback: {} + goal: {} + goal_default: {} + handles: [] + result: {} + schema: + description: wait_error的参数schema + properties: + feedback: {} + goal: + properties: {} + required: [] + type: object + result: {} + required: + - goal + title: wait_error参数 + type: object + type: UniLabJsonCommand + hardware_interface: + name: hardware_interface + read: send_command + write: send_command + module: unilabos.devices.pump_and_valve.runze_backbone:RunzeSyringePump + status_types: + max_velocity: float + mode: int + plunger_position: String + position: float + status: str + valve_position: str + velocity_end: String + velocity_grade: String + velocity_init: String + type: python + config_info: [] + description: 润泽精密注射泵设备,集成阀门控制的高精度流体输送系统。该设备通过串口通信控制,支持多种运行模式和精确的体积控制。具备可变速度控制、精密定位、阀门切换、实时状态监控等功能。适用于微量液体输送、精密进样、流速控制、化学反应进料等需要高精度流体操作的实验室自动化应用。 + handles: + - data_key: fluid_port_1 + data_source: executor + data_type: fluid + description: 八通阀门端口1 + handler_key: '1' + io_type: source + label: '1' + side: NORTH + - data_key: fluid_port_2 + data_source: executor + data_type: fluid + description: 八通阀门端口2 + handler_key: '2' + io_type: source + label: '2' + side: EAST + - data_key: fluid_port_3 + data_source: executor + data_type: fluid + description: 八通阀门端口3 + handler_key: '3' + io_type: source + label: '3' + side: EAST + - data_key: fluid_port_4 + data_source: executor + data_type: fluid + description: 八通阀门端口4 + handler_key: '4' + io_type: source + label: '4' + side: SOUTH + - data_key: fluid_port_5 + data_source: executor + data_type: fluid + description: 八通阀门端口5 + handler_key: '5' + io_type: source + label: '5' + side: SOUTH + - data_key: fluid_port_6 + data_source: executor + data_type: fluid + description: 八通阀门端口6 + handler_key: '6' + io_type: source + label: '6' + side: WEST + - data_key: fluid_port_7 + data_source: executor + data_type: fluid + description: 八通阀门端口7 + handler_key: '7' + io_type: source + label: '7' + side: WEST + - data_key: fluid_port_8 + data_source: executor + data_type: fluid + description: 八通阀门端口8-特殊输入 + handler_key: '8' + io_type: target + label: '8' + side: WEST + - data_key: fluid_port_8 + data_source: executor + data_type: fluid + description: 八通阀门端口8 + handler_key: '8' + io_type: source + label: '8' + side: NORTH + icon: '' init_param_schema: config: properties: diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index 58734a3c..a0420130 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -225,7 +225,8 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): self.lab_logger().info(f"Working on physical setup: {physical_setup_graph}") protocol_steps = protocol_steps_generator(G=physical_setup_graph, **protocol_kwargs) - self.lab_logger().info(f"Goal received: {protocol_kwargs}, running steps: \n{protocol_steps}") + self.lab_logger().info(f"Goal received: {protocol_kwargs}, running steps: " + f"{json.dumps([step for step in protocol_steps if 'log_message' not in step['action_kwargs']], indent=4)}") time_start = time.time() time_overall = 100 diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index da6b2528..0e84683e 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -6,6 +6,7 @@ from pylabrobot.resources import Resource as PLRResource, Plate, TipRack, Coordi from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker + class WorkStationContainer(Plate, TipRack): """ WorkStation 专用 Container 类,继承自 Plate和TipRack @@ -81,4 +82,5 @@ class WorkStationExample(ROS2ProtocolNode): """ to_base_plate.assign_child_resource(from_plate, Coordinate.zero()) - pass \ No newline at end of file + pass +