diff --git a/scripts/workflow.py b/scripts/workflow.py index be7bbd1e..8bd89640 100644 --- a/scripts/workflow.py +++ b/scripts/workflow.py @@ -2,7 +2,6 @@ import json import logging import traceback import uuid -import xml.etree.ElementTree as ET from typing import Any, Dict, List import networkx as nx @@ -25,7 +24,15 @@ class SimpleGraph: def add_edge(self, source, target, **attrs): """添加边""" - edge = {"source": source, "target": target, **attrs} + # edge = {"source": source, "target": target, **attrs} + edge = { + "source": source, "target": target, + "source_node_uuid": source, + "target_node_uuid": target, + "source_handle_io": "source", + "target_handle_io": "target", + **attrs + } self.edges.append(edge) def to_dict(self): @@ -42,6 +49,7 @@ class SimpleGraph: "multigraph": False, "graph": {}, "nodes": nodes_list, + "edges": self.edges, "links": self.edges, } @@ -58,495 +66,8 @@ def extract_json_from_markdown(text: str) -> str: return text -def convert_to_type(val: str) -> Any: - """将字符串值转换为适当的数据类型""" - if val == "True": - return True - if val == "False": - return False - if val == "?": - return None - if val.endswith(" g"): - return float(val.split(" ")[0]) - if val.endswith("mg"): - return float(val.split("mg")[0]) - elif val.endswith("mmol"): - return float(val.split("mmol")[0]) / 1000 - elif val.endswith("mol"): - return float(val.split("mol")[0]) - elif val.endswith("ml"): - return float(val.split("ml")[0]) - elif val.endswith("RPM"): - return float(val.split("RPM")[0]) - elif val.endswith(" °C"): - return float(val.split(" ")[0]) - elif val.endswith(" %"): - return float(val.split(" ")[0]) - return val -def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """统一的数据重构函数,根据操作类型自动选择模板""" - refactored_data = [] - - # 定义操作映射,包含生物实验和有机化学的所有操作 - OPERATION_MAPPING = { - # 生物实验操作 - "transfer_liquid": "SynBioFactory-liquid_handler.prcxi-transfer_liquid", - "transfer": "SynBioFactory-liquid_handler.biomek-transfer", - "incubation": "SynBioFactory-liquid_handler.biomek-incubation", - "move_labware": "SynBioFactory-liquid_handler.biomek-move_labware", - "oscillation": "SynBioFactory-liquid_handler.biomek-oscillation", - # 有机化学操作 - "HeatChillToTemp": "SynBioFactory-workstation-HeatChillProtocol", - "StopHeatChill": "SynBioFactory-workstation-HeatChillStopProtocol", - "StartHeatChill": "SynBioFactory-workstation-HeatChillStartProtocol", - "HeatChill": "SynBioFactory-workstation-HeatChillProtocol", - "Dissolve": "SynBioFactory-workstation-DissolveProtocol", - "Transfer": "SynBioFactory-workstation-TransferProtocol", - "Evaporate": "SynBioFactory-workstation-EvaporateProtocol", - "Recrystallize": "SynBioFactory-workstation-RecrystallizeProtocol", - "Filter": "SynBioFactory-workstation-FilterProtocol", - "Dry": "SynBioFactory-workstation-DryProtocol", - "Add": "SynBioFactory-workstation-AddProtocol", - } - - UNSUPPORTED_OPERATIONS = ["Purge", "Wait", "Stir", "ResetHandling"] - - for step in data: - operation = step.get("action") - if not operation or operation in UNSUPPORTED_OPERATIONS: - continue - - # 处理重复操作 - if operation == "Repeat": - times = step.get("times", step.get("parameters", {}).get("times", 1)) - sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) - for i in range(int(times)): - sub_data = refactor_data(sub_steps) - refactored_data.extend(sub_data) - continue - - # 获取模板名称 - template = OPERATION_MAPPING.get(operation) - if not template: - # 自动推断模板类型 - if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: - template = f"SynBioFactory-liquid_handler.biomek-{operation}" - else: - template = f"SynBioFactory-workstation-{operation}Protocol" - - # 创建步骤数据 - step_data = { - "template": template, - "description": step.get("description", step.get("purpose", f"{operation} operation")), - "lab_node_type": "Device", - "parameters": step.get("parameters", step.get("action_args", {})), - } - refactored_data.append(step_data) - - return refactored_data - - -def build_protocol_graph( - labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str -) -> SimpleGraph: - """统一的协议图构建函数,根据设备类型自动选择构建逻辑""" - G = SimpleGraph() - resource_last_writer = {} - LAB_NAME = "SynBioFactory" - - protocol_steps = refactor_data(protocol_steps) - - # 检查协议步骤中的模板来判断协议类型 - has_biomek_template = any( - ("biomek" in step.get("template", "")) or ("prcxi" in step.get("template", "")) - for step in protocol_steps - ) - - if has_biomek_template: - # 生物实验协议图构建 - for labware_id, labware in labware_info.items(): - node_id = str(uuid.uuid4()) - - labware_attrs = labware.copy() - labware_id = labware_attrs.pop("id", labware_attrs.get("name", f"labware_{uuid.uuid4()}")) - labware_attrs["description"] = labware_id - labware_attrs["lab_node_type"] = ( - "Reagent" if "Plate" in str(labware_id) else "Labware" if "Rack" in str(labware_id) else "Sample" - ) - labware_attrs["device_id"] = workstation_name - - G.add_node(node_id, template=f"{LAB_NAME}-host_node-create_resource", **labware_attrs) - resource_last_writer[labware_id] = f"{node_id}:labware" - - # 处理协议步骤 - prev_node = None - for i, step in enumerate(protocol_steps): - node_id = str(uuid.uuid4()) - G.add_node(node_id, **step) - - # 添加控制流边 - if prev_node is not None: - G.add_edge(prev_node, node_id, source_port="ready", target_port="ready") - prev_node = node_id - - # 处理物料流 - params = step.get("parameters", {}) - if "sources" in params and params["sources"] in resource_last_writer: - source_node, source_port = resource_last_writer[params["sources"]].split(":") - G.add_edge(source_node, node_id, source_port=source_port, target_port="labware") - - if "targets" in params: - resource_last_writer[params["targets"]] = f"{node_id}:labware" - - # 添加协议结束节点 - end_id = str(uuid.uuid4()) - G.add_node(end_id, template=f"{LAB_NAME}-liquid_handler.biomek-run_protocol") - if prev_node is not None: - G.add_edge(prev_node, end_id, source_port="ready", target_port="ready") - - else: - # 有机化学协议图构建 - WORKSTATION_ID = workstation_name - - # 为所有labware创建资源节点 - for item_id, item in labware_info.items(): - # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") - node_id = str(uuid.uuid4()) - - # 判断节点类型 - if item.get("type") == "hardware" or "reactor" in str(item_id).lower(): - if "reactor" not in str(item_id).lower(): - continue - lab_node_type = "Sample" - description = f"Prepare Reactor: {item_id}" - liquid_type = [] - liquid_volume = [] - else: - lab_node_type = "Reagent" - description = f"Add Reagent to Flask: {item_id}" - liquid_type = [item_id] - liquid_volume = [1e5] - - G.add_node( - node_id, - template=f"{LAB_NAME}-host_node-create_resource", - description=description, - lab_node_type=lab_node_type, - res_id=item_id, - device_id=WORKSTATION_ID, - class_name="container", - parent=WORKSTATION_ID, - bind_locations={"x": 0.0, "y": 0.0, "z": 0.0}, - liquid_input_slot=[-1], - liquid_type=liquid_type, - liquid_volume=liquid_volume, - slot_on_deck="", - role=item.get("role", ""), - ) - resource_last_writer[item_id] = f"{node_id}:labware" - - last_control_node_id = None - - # 处理协议步骤 - for step in protocol_steps: - node_id = str(uuid.uuid4()) - G.add_node(node_id, **step) - - # 控制流 - if last_control_node_id is not None: - G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") - last_control_node_id = node_id - - # 物料流 - params = step.get("parameters", {}) - input_resources = { - "Vessel": params.get("vessel"), - "ToVessel": params.get("to_vessel"), - "FromVessel": params.get("from_vessel"), - "reagent": params.get("reagent"), - "solvent": params.get("solvent"), - "compound": params.get("compound"), - "sources": params.get("sources"), - "targets": params.get("targets"), - } - - for target_port, resource_name in input_resources.items(): - if resource_name and resource_name in resource_last_writer: - source_node, source_port = resource_last_writer[resource_name].split(":") - G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) - - output_resources = { - "VesselOut": params.get("vessel"), - "FromVesselOut": params.get("from_vessel"), - "ToVesselOut": params.get("to_vessel"), - "FiltrateOut": params.get("filtrate_vessel"), - "reagent": params.get("reagent"), - "solvent": params.get("solvent"), - "compound": params.get("compound"), - "sources_out": params.get("sources"), - "targets_out": params.get("targets"), - } - - for source_port, resource_name in output_resources.items(): - if resource_name: - resource_last_writer[resource_name] = f"{node_id}:{source_port}" - - return G - - -def draw_protocol_graph(protocol_graph: SimpleGraph, output_path: str): - """ - (辅助功能) 使用 networkx 和 matplotlib 绘制协议工作流图,用于可视化。 - """ - if not protocol_graph: - print("Cannot draw graph: Graph object is empty.") - return - - G = nx.DiGraph() - - for node_id, attrs in protocol_graph.nodes.items(): - label = attrs.get("description", attrs.get("template", node_id[:8])) - G.add_node(node_id, label=label, **attrs) - - for edge in protocol_graph.edges: - G.add_edge(edge["source"], edge["target"]) - - plt.figure(figsize=(20, 15)) - try: - pos = nx.nx_agraph.graphviz_layout(G, prog="dot") - except Exception: - pos = nx.shell_layout(G) # Fallback layout - - node_labels = {node: data["label"] for node, data in G.nodes(data=True)} - nx.draw( - G, - pos, - with_labels=False, - node_size=2500, - node_color="skyblue", - node_shape="o", - edge_color="gray", - width=1.5, - arrowsize=15, - ) - nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=8, font_weight="bold") - - plt.title("Chemical Protocol Workflow Graph", size=15) - plt.savefig(output_path, dpi=300, bbox_inches="tight") - plt.close() - print(f" - Visualization saved to '{output_path}'") - - -from networkx.drawing.nx_agraph import to_agraph -import re - -COMPASS = {"n","e","s","w","ne","nw","se","sw","c"} - -def _is_compass(port: str) -> bool: - return isinstance(port, str) and port.lower() in COMPASS - -def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): - """ - 使用 Graphviz 端口语法绘制协议工作流图。 - - 若边上的 source_port/target_port 是 compass(n/e/s/w/...),直接用 compass。 - - 否则自动为节点创建 record 形状并定义命名端口 。 - 最终由 PyGraphviz 渲染并输出到 output_path(后缀决定格式,如 .png/.svg/.pdf)。 - """ - if not protocol_graph: - print("Cannot draw graph: Graph object is empty.") - return - - # 1) 先用 networkx 搭建有向图,保留端口属性 - G = nx.DiGraph() - for node_id, attrs in protocol_graph.nodes.items(): - label = attrs.get("description", attrs.get("template", node_id[:8])) - # 保留一个干净的“中心标签”,用于放在 record 的中间槽 - G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)}) - - edges_data = [] - in_ports_by_node = {} # 收集命名输入端口 - out_ports_by_node = {} # 收集命名输出端口 - - for edge in protocol_graph.edges: - u = edge["source"] - v = edge["target"] - sp = edge.get("source_port") - tp = edge.get("target_port") - - # 记录到图里(保留原始端口信息) - G.add_edge(u, v, source_port=sp, target_port=tp) - edges_data.append((u, v, sp, tp)) - - # 如果不是 compass,就按“命名端口”先归类,等会儿给节点造 record - if sp and not _is_compass(sp): - out_ports_by_node.setdefault(u, set()).add(str(sp)) - if tp and not _is_compass(tp): - in_ports_by_node.setdefault(v, set()).add(str(tp)) - - # 2) 转为 AGraph,使用 Graphviz 渲染 - A = to_agraph(G) - A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") - A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica") - A.edge_attr.update(arrowsize="0.8", color="#666666") - - # 3) 为需要命名端口的节点设置 record 形状与 label - # 左列 = 输入端口;中间 = 核心标签;右列 = 输出端口 - for n in A.nodes(): - node = A.get_node(n) - core = G.nodes[n].get("_core_label", n) - - in_ports = sorted(in_ports_by_node.get(n, [])) - out_ports = sorted(out_ports_by_node.get(n, [])) - - # 如果该节点涉及命名端口,则用 record;否则保留原 box - if in_ports or out_ports: - def port_fields(ports): - if not ports: - return " " # 必须留一个空槽占位 - # 每个端口一个小格子,

name - return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports) - - left = port_fields(in_ports) - right = port_fields(out_ports) - - # 三栏:左(入) | 中(节点名) | 右(出) - record_label = f"{{ {left} | {core} | {right} }}" - node.attr.update(shape="record", label=record_label) - else: - # 没有命名端口:普通盒子,显示核心标签 - node.attr.update(label=str(core)) - - # 4) 给边设置 headport / tailport - # - 若端口为 compass:直接用 compass(e.g., headport="e") - # - 若端口为命名端口:使用在 record 中定义的 名(同名即可) - for (u, v, sp, tp) in edges_data: - e = A.get_edge(u, v) - - # Graphviz 属性:tail 是源,head 是目标 - if sp: - if _is_compass(sp): - e.attr["tailport"] = sp.lower() - else: - # 与 record label 中 名一致;特殊字符已在 label 中做了清洗 - e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp)) - - if tp: - if _is_compass(tp): - e.attr["headport"] = tp.lower() - else: - e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp)) - - # 可选:若想让边更贴边缘,可设置 constraint/spline 等 - # e.attr["arrowhead"] = "vee" - - # 5) 输出 - A.draw(output_path, prog="dot") - print(f" - Port-aware workflow rendered to '{output_path}'") - - -def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]: - """展平嵌套的XDL程序结构""" - flattened_operations = [] - TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"] - - def extract_operations(element: ET.Element): - if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]: - if element.tag not in TEMP_UNSUPPORTED_PROTOCOL: - flattened_operations.append(element) - - for child in element: - extract_operations(child) - - for child in procedure_elem: - extract_operations(child) - - return flattened_operations - - -def parse_xdl_content(xdl_content: str) -> tuple: - """解析XDL内容""" - try: - xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable()) - root = ET.fromstring(xdl_content_cleaned) - - synthesis_elem = root.find("Synthesis") - if synthesis_elem is None: - return None, None, None - - # 解析硬件组件 - hardware_elem = synthesis_elem.find("Hardware") - hardware = [] - if hardware_elem is not None: - hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")] - - # 解析试剂 - reagents_elem = synthesis_elem.find("Reagents") - reagents = [] - if reagents_elem is not None: - reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")] - - # 解析程序 - procedure_elem = synthesis_elem.find("Procedure") - if procedure_elem is None: - return None, None, None - - flattened_operations = flatten_xdl_procedure(procedure_elem) - return hardware, reagents, flattened_operations - - except ET.ParseError as e: - raise ValueError(f"Invalid XDL format: {e}") - - -def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]: - """ - 将XDL XML格式转换为标准的字典格式 - - Args: - xdl_content: XDL XML内容 - - Returns: - 转换结果,包含步骤和器材信息 - """ - try: - hardware, reagents, flattened_operations = parse_xdl_content(xdl_content) - if hardware is None: - return {"error": "Failed to parse XDL content", "success": False} - - # 将XDL元素转换为字典格式 - steps_data = [] - for elem in flattened_operations: - # 转换参数类型 - parameters = {} - for key, val in elem.attrib.items(): - converted_val = convert_to_type(val) - if converted_val is not None: - parameters[key] = converted_val - - step_dict = { - "operation": elem.tag, - "parameters": parameters, - "description": elem.get("purpose", f"Operation: {elem.tag}"), - } - steps_data.append(step_dict) - - # 合并硬件和试剂为统一的labware_info格式 - labware_data = [] - labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware) - labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents) - - return { - "success": True, - "steps": steps_data, - "labware": labware_data, - "message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.", - } - - except Exception as e: - error_msg = f"XDL conversion failed: {str(e)}" - logger.error(error_msg) - return {"error": error_msg, "success": False} def create_workflow( diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 58875524..08595de5 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -49,6 +49,8 @@ def convert_argv_dashes_to_underscores(args: argparse.ArgumentParser): def parse_args(): """解析命令行参数""" parser = argparse.ArgumentParser(description="Start Uni-Lab Edge server.") + subparsers = parser.add_subparsers(title="Valid subcommands", dest="command") + parser.add_argument("-g", "--graph", help="Physical setup graph file path.") parser.add_argument("-c", "--controllers", default=None, help="Controllers config file path.") parser.add_argument( @@ -153,6 +155,14 @@ def parse_args(): default=False, help="Complete registry information", ) + + # label + workflow_parser = subparsers.add_parser( + "workflow_upload", + help="Upload workflow from xdl/json/python files", + ) + workflow_parser.add_argument("-t", "--labeltype", default="singlepoint", type=str, + help="QM calculation type, support 'singlepoint', 'optimize' and 'dimer' currently") return parser @@ -163,6 +173,9 @@ def main(): convert_argv_dashes_to_underscores(args) args_dict = vars(args.parse_args()) + # 显示启动横幅 + print_unilab_banner(args_dict) + # 环境检查 - 检查并自动安装必需的包 (可选) if not args_dict.get("skip_env_check", False): from unilabos.utils.environment_check import check_environment @@ -239,7 +252,18 @@ def main(): if args_dict.get("sk", ""): BasicConfig.sk = args_dict.get("sk", "") print_status("传入了sk参数,优先采用传入参数!", "info") + BasicConfig.working_dir = working_dir + # 显示启动横幅 + print_unilab_banner(args_dict) + + ##################################### + ######## 启动设备接入端(主入口) ######## + ##################################### + launch(args_dict) + + +def launch(args_dict: Dict[str, Any]): # 使用远程资源启动 if args_dict["use_remote_resource"]: print_status("使用远程资源启动", "info") @@ -254,7 +278,6 @@ def main(): BasicConfig.port = args_dict["port"] if args_dict["port"] else BasicConfig.port BasicConfig.disable_browser = args_dict["disable_browser"] or BasicConfig.disable_browser - BasicConfig.working_dir = working_dir BasicConfig.is_host_mode = not args_dict.get("is_slave", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.upload_registry = args_dict.get("upload_registry", False) @@ -278,9 +301,6 @@ def main(): from unilabos.resources.graphio import modify_to_backend_format from unilabos.ros.nodes.resource_tracker import ResourceTreeSet, ResourceDict - # 显示启动横幅 - print_unilab_banner(args_dict) - # 注册表 lab_registry = build_registry( args_dict["registry_path"], args_dict.get("complete_registry", False), args_dict["upload_registry"] diff --git a/unilabos/test/workflow/merge_workflow.py b/unilabos/test/workflow/merge_workflow.py index fb409769..3d3c6586 100644 --- a/unilabos/test/workflow/merge_workflow.py +++ b/unilabos/test/workflow/merge_workflow.py @@ -9,7 +9,7 @@ if str(ROOT_DIR) not in sys.path: import pytest -from scripts.workflow import build_protocol_graph, draw_protocol_graph, draw_protocol_graph_with_ports +from unilabos.workflow.common import build_protocol_graph, draw_protocol_graph, draw_protocol_graph_with_ports ROOT_DIR = Path(__file__).resolve().parents[2] diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py new file mode 100644 index 00000000..b884a706 --- /dev/null +++ b/unilabos/workflow/common.py @@ -0,0 +1,484 @@ +import re +import uuid + +import networkx as nx +from networkx.drawing.nx_agraph import to_agraph +import matplotlib.pyplot as plt +from typing import Dict, List, Any, Tuple, Optional + +Json = Dict[str, Any] + +# ---------------- Graph ---------------- + +class WorkflowGraph: + """简单的有向图实现:使用 params 单层参数;inputs 内含连线;支持 node-link 导出""" + + def __init__(self): + self.nodes: Dict[str, Dict[str, Any]] = {} + self.edges: List[Dict[str, Any]] = [] + + def add_node(self, node_id: str, **attrs): + self.nodes[node_id] = attrs + + def add_edge(self, source: str, target: str, **attrs): + edge = { + "source": source, + "target": target, + "source_node_uuid": source, + "target_node_uuid": target, + "source_handle_io": attrs.pop("source_handle_io", "source"), + "target_handle_io": attrs.pop("target_handle_io", "target"), + **attrs + } + self.edges.append(edge) + + def _materialize_wiring_into_inputs(self, obj: Any, inputs: Dict[str, Any], + variable_sources: Dict[str, Dict[str, Any]], + target_node_id: str, base_path: List[str]): + has_var = False + + def walk(node: Any, path: List[str]): + nonlocal has_var + if isinstance(node, dict): + if "__var__" in node: + has_var = True + varname = node["__var__"] + placeholder = f"${{{varname}}}" + src = variable_sources.get(varname) + if src: + key = ".".join(path) # e.g. "params.foo.bar.0" + inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")} + self.add_edge(str(src["node_id"]), target_node_id, + source_handle_io=src.get("output_name", "result"), + target_handle_io=key) + return placeholder + return {k: walk(v, path + [k]) for k, v in node.items()} + if isinstance(node, list): + return [walk(v, path + [str(i)]) for i, v in enumerate(node)] + return node + + replaced = walk(obj, base_path[:]) + return replaced, has_var + + def add_workflow_node(self, + node_id: int, + *, + device_key: Optional[str] = None, # 实例名,如 "ser" + resource_name: Optional[str] = None, # registry key(原 device_class) + module: Optional[str] = None, + template_name: Optional[str] = None, # 动作/模板名(原 action_key) + params: Dict[str, Any], + variable_sources: Dict[str, Dict[str, Any]], + add_ready_if_no_vars: bool = True, + prev_node_id: Optional[int] = None, + **extra_attrs) -> None: + """添加工作流节点:params 单层;自动变量连线与 ready 串联;支持附加属性""" + node_id_str = str(node_id) + inputs: Dict[str, Any] = {} + + params, has_var = self._materialize_wiring_into_inputs( + params, inputs, variable_sources, node_id_str, base_path=["params"] + ) + + if add_ready_if_no_vars and not has_var: + last_id = str(prev_node_id) if prev_node_id is not None else "-1" + inputs["ready"] = {"node": int(last_id), "output": "ready"} + self.add_edge(last_id, node_id_str, source_handle_io="ready", target_handle_io="ready") + + node_obj = { + "device_key": device_key, + "resource_name": resource_name, # ✅ 新名字 + "module": module, + "template_name": template_name, # ✅ 新名字 + "params": params, + "inputs": inputs, + } + node_obj.update(extra_attrs or {}) + self.add_node(node_id_str, parameters=node_obj) + + # 顺序工作流导出(连线在 inputs,不返回 edges) + def to_dict(self) -> List[Dict[str, Any]]: + result = [] + for node_id, attrs in self.nodes.items(): + node = {"id": node_id} + params = dict(attrs.get("parameters", {}) or {}) + flat = {k: v for k, v in attrs.items() if k != "parameters"} + flat.update(params) + node.update(flat) + result.append(node) + return sorted(result, key=lambda n: int(n["id"]) if str(n["id"]).isdigit() else n["id"]) + + # node-link 导出(含 edges) + def to_node_link_dict(self) -> Dict[str, Any]: + nodes_list = [] + for node_id, attrs in self.nodes.items(): + node_attrs = attrs.copy() + params = node_attrs.pop("parameters", {}) or {} + node_attrs.update(params) + nodes_list.append({"id": node_id, **node_attrs}) + return {"directed": True, "multigraph": False, "graph": {}, "nodes": nodes_list, "edges": self.edges, "links": self.edges} + + +def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """统一的数据重构函数,根据操作类型自动选择模板""" + refactored_data = [] + + # 定义操作映射,包含生物实验和有机化学的所有操作 + OPERATION_MAPPING = { + # 生物实验操作 + "transfer_liquid": "transfer_liquid", + "transfer": "transfer", + "incubation": "incubation", + "move_labware": "move_labware", + "oscillation": "oscillation", + # 有机化学操作 + "HeatChillToTemp": "HeatChillProtocol", + "StopHeatChill": "HeatChillStopProtocol", + "StartHeatChill": "HeatChillStartProtocol", + "HeatChill": "HeatChillProtocol", + "Dissolve": "DissolveProtocol", + "Transfer": "TransferProtocol", + "Evaporate": "EvaporateProtocol", + "Recrystallize": "RecrystallizeProtocol", + "Filter": "FilterProtocol", + "Dry": "DryProtocol", + "Add": "AddProtocol", + } + + UNSUPPORTED_OPERATIONS = ["Purge", "Wait", "Stir", "ResetHandling"] + + for step in data: + operation = step.get("action") + if not operation or operation in UNSUPPORTED_OPERATIONS: + continue + + # 处理重复操作 + if operation == "Repeat": + times = step.get("times", step.get("parameters", {}).get("times", 1)) + sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) + for i in range(int(times)): + sub_data = refactor_data(sub_steps) + refactored_data.extend(sub_data) + continue + + # 获取模板名称 + template = OPERATION_MAPPING.get(operation) + if not template: + # 自动推断模板类型 + if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: + template = f"biomek-{operation}" + else: + template = f"{operation}Protocol" + + # 创建步骤数据 + step_data = { + "template": template, + "description": step.get("description", step.get("purpose", f"{operation} operation")), + "lab_node_type": "Device", + "parameters": step.get("parameters", step.get("action_args", {})), + } + refactored_data.append(step_data) + + return refactored_data + + +def build_protocol_graph( + labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str +) -> WorkflowGraph: + """统一的协议图构建函数,根据设备类型自动选择构建逻辑""" + G = WorkflowGraph() + resource_last_writer = {} + + protocol_steps = refactor_data(protocol_steps) + # 有机化学&移液站协议图构建 + WORKSTATION_ID = workstation_name + + # 为所有labware创建资源节点 + for labware_id, item in labware_info.items(): + # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") + node_id = str(uuid.uuid4()) + + # 判断节点类型 + if "Rack" in str(labware_id) or "Tip" in str(labware_id): + lab_node_type = "Labware" + description = f"Prepare Labware: {labware_id}" + liquid_type = [] + liquid_volume = [] + elif item.get("type") == "hardware" or "reactor" in str(labware_id).lower(): + if "reactor" not in str(labware_id).lower(): + continue + lab_node_type = "Sample" + description = f"Prepare Reactor: {labware_id}" + liquid_type = [] + liquid_volume = [] + else: + lab_node_type = "Reagent" + description = f"Add Reagent to Flask: {labware_id}" + liquid_type = [labware_id] + liquid_volume = [1e5] + + G.add_node( + node_id, + template_name=f"create_resource", + resource_name="host_node", + description=description, + lab_node_type=lab_node_type, + params={ + "res_id": labware_id, + "device_id": WORKSTATION_ID, + "class_name": "container", + "parent": WORKSTATION_ID, + "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, + "liquid_input_slot": [-1], + "liquid_type": liquid_type, + "liquid_volume": liquid_volume, + "slot_on_deck": "", + }, + role=item.get("role", ""), + ) + resource_last_writer[labware_id] = f"{node_id}:labware" + + last_control_node_id = None + + # 处理协议步骤 + for step in protocol_steps: + node_id = str(uuid.uuid4()) + G.add_node(node_id, **step) + + # 控制流 + if last_control_node_id is not None: + G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") + last_control_node_id = node_id + + # 物料流 + params = step.get("parameters", {}) + input_resources_possible_names = [ + "vessel", + "to_vessel", + "from_vessel", + "reagent", + "solvent", + "compound", + "sources", + "targets", + ] + + for target_port in input_resources_possible_names: + resource_name = params.get(target_port) + if resource_name and resource_name in resource_last_writer: + source_node, source_port = resource_last_writer[resource_name].split(":") + G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) + + output_resources = { + "vessel_out": params.get("vessel"), + "from_vessel_out": params.get("from_vessel"), + "to_vessel_out": params.get("to_vessel"), + "filtrate_out": params.get("filtrate_vessel"), + "reagent": params.get("reagent"), + "solvent": params.get("solvent"), + "compound": params.get("compound"), + "sources_out": params.get("sources"), + "targets_out": params.get("targets"), + } + + for source_port, resource_name in output_resources.items(): + if resource_name: + resource_last_writer[resource_name] = f"{node_id}:{source_port}" + + return G + + +def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str): + """ + (辅助功能) 使用 networkx 和 matplotlib 绘制协议工作流图,用于可视化。 + """ + if not protocol_graph: + print("Cannot draw graph: Graph object is empty.") + return + + G = nx.DiGraph() + + for node_id, attrs in protocol_graph.nodes.items(): + label = attrs.get("description", attrs.get("template", node_id[:8])) + G.add_node(node_id, label=label, **attrs) + + for edge in protocol_graph.edges: + G.add_edge(edge["source"], edge["target"]) + + plt.figure(figsize=(20, 15)) + try: + pos = nx.nx_agraph.graphviz_layout(G, prog="dot") + except Exception: + pos = nx.shell_layout(G) # Fallback layout + + node_labels = {node: data["label"] for node, data in G.nodes(data=True)} + nx.draw( + G, + pos, + with_labels=False, + node_size=2500, + node_color="skyblue", + node_shape="o", + edge_color="gray", + width=1.5, + arrowsize=15, + ) + nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=8, font_weight="bold") + + plt.title("Chemical Protocol Workflow Graph", size=15) + plt.savefig(output_path, dpi=300, bbox_inches="tight") + plt.close() + print(f" - Visualization saved to '{output_path}'") + + +COMPASS = {"n","e","s","w","ne","nw","se","sw","c"} + +def _is_compass(port: str) -> bool: + return isinstance(port, str) and port.lower() in COMPASS + +def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): + """ + 使用 Graphviz 端口语法绘制协议工作流图。 + - 若边上的 source_port/target_port 是 compass(n/e/s/w/...),直接用 compass。 + - 否则自动为节点创建 record 形状并定义命名端口 。 + 最终由 PyGraphviz 渲染并输出到 output_path(后缀决定格式,如 .png/.svg/.pdf)。 + """ + if not protocol_graph: + print("Cannot draw graph: Graph object is empty.") + return + + # 1) 先用 networkx 搭建有向图,保留端口属性 + G = nx.DiGraph() + for node_id, attrs in protocol_graph.nodes.items(): + label = attrs.get("description", attrs.get("template", node_id[:8])) + # 保留一个干净的“中心标签”,用于放在 record 的中间槽 + G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)}) + + edges_data = [] + in_ports_by_node = {} # 收集命名输入端口 + out_ports_by_node = {} # 收集命名输出端口 + + for edge in protocol_graph.edges: + u = edge["source"] + v = edge["target"] + sp = edge.get("source_port") + tp = edge.get("target_port") + + # 记录到图里(保留原始端口信息) + G.add_edge(u, v, source_port=sp, target_port=tp) + edges_data.append((u, v, sp, tp)) + + # 如果不是 compass,就按“命名端口”先归类,等会儿给节点造 record + if sp and not _is_compass(sp): + out_ports_by_node.setdefault(u, set()).add(str(sp)) + if tp and not _is_compass(tp): + in_ports_by_node.setdefault(v, set()).add(str(tp)) + + # 2) 转为 AGraph,使用 Graphviz 渲染 + A = to_agraph(G) + A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") + A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica") + A.edge_attr.update(arrowsize="0.8", color="#666666") + + # 3) 为需要命名端口的节点设置 record 形状与 label + # 左列 = 输入端口;中间 = 核心标签;右列 = 输出端口 + for n in A.nodes(): + node = A.get_node(n) + core = G.nodes[n].get("_core_label", n) + + in_ports = sorted(in_ports_by_node.get(n, [])) + out_ports = sorted(out_ports_by_node.get(n, [])) + + # 如果该节点涉及命名端口,则用 record;否则保留原 box + if in_ports or out_ports: + def port_fields(ports): + if not ports: + return " " # 必须留一个空槽占位 + # 每个端口一个小格子,

name + return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports) + + left = port_fields(in_ports) + right = port_fields(out_ports) + + # 三栏:左(入) | 中(节点名) | 右(出) + record_label = f"{{ {left} | {core} | {right} }}" + node.attr.update(shape="record", label=record_label) + else: + # 没有命名端口:普通盒子,显示核心标签 + node.attr.update(label=str(core)) + + # 4) 给边设置 headport / tailport + # - 若端口为 compass:直接用 compass(e.g., headport="e") + # - 若端口为命名端口:使用在 record 中定义的 名(同名即可) + for (u, v, sp, tp) in edges_data: + e = A.get_edge(u, v) + + # Graphviz 属性:tail 是源,head 是目标 + if sp: + if _is_compass(sp): + e.attr["tailport"] = sp.lower() + else: + # 与 record label 中 名一致;特殊字符已在 label 中做了清洗 + e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp)) + + if tp: + if _is_compass(tp): + e.attr["headport"] = tp.lower() + else: + e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp)) + + # 可选:若想让边更贴边缘,可设置 constraint/spline 等 + # e.attr["arrowhead"] = "vee" + + # 5) 输出 + A.draw(output_path, prog="dot") + print(f" - Port-aware workflow rendered to '{output_path}'") +# ---------------- Registry Adapter ---------------- + + +class RegistryAdapter: + """根据 module 的类名(冒号右侧)反查 registry 的 resource_name(原 device_class),并抽取参数顺序""" + def __init__(self, device_registry: Dict[str, Any]): + self.device_registry = device_registry or {} + self.module_class_to_resource = self._build_module_class_index() + + def _build_module_class_index(self) -> Dict[str, str]: + idx = {} + for resource_name, info in self.device_registry.items(): + module = info.get("module") + if isinstance(module, str) and ":" in module: + cls = module.split(":")[-1] + idx[cls] = resource_name + idx[cls.lower()] = resource_name + return idx + + def resolve_resource_by_classname(self, class_name: str) -> Optional[str]: + if not class_name: + return None + return (self.module_class_to_resource.get(class_name) + or self.module_class_to_resource.get(class_name.lower())) + + def get_device_module(self, resource_name: Optional[str]) -> Optional[str]: + if not resource_name: + return None + return self.device_registry.get(resource_name, {}).get("module") + + def get_actions(self, resource_name: Optional[str]) -> Dict[str, Any]: + if not resource_name: + return {} + return (self.device_registry.get(resource_name, {}) + .get("class", {}) + .get("action_value_mappings", {})) or {} + + def get_action_schema(self, resource_name: Optional[str], template_name: str) -> Optional[Json]: + return (self.get_actions(resource_name).get(template_name) or {}).get("schema") + + def get_action_goal_default(self, resource_name: Optional[str], template_name: str) -> Json: + return (self.get_actions(resource_name).get(template_name) or {}).get("goal_default", {}) or {} + + def get_action_input_keys(self, resource_name: Optional[str], template_name: str) -> List[str]: + schema = self.get_action_schema(resource_name, template_name) or {} + goal = (schema.get("properties") or {}).get("goal") or {} + props = goal.get("properties") or {} + required = goal.get("required") or [] + return list(dict.fromkeys(required + list(props.keys()))) diff --git a/unilabos/workflow/from_labwares_and_steps.py b/unilabos/workflow/from_labwares_and_steps.py new file mode 100644 index 00000000..7e4ec641 --- /dev/null +++ b/unilabos/workflow/from_labwares_and_steps.py @@ -0,0 +1,24 @@ +import json +from os import PathLike + +from unilabos.workflow.common import build_protocol_graph + + +def from_labwares_and_steps(data_path: PathLike): + with data_path.open("r", encoding="utf-8") as fp: + d = json.load(fp) + + if "workflow" in d and "reagent" in d: + protocol_steps = d["workflow"] + labware_info = d["reagent"] + elif "steps_info" in d and "labware_info" in d: + protocol_steps = _normalize_steps(d["steps_info"]) + labware_info = _normalize_labware(d["labware_info"]) + else: + raise ValueError("Unsupported protocol format") + + graph = build_protocol_graph( + labware_info=labware_info, + protocol_steps=protocol_steps, + workstation_name="PRCXi", + ) \ No newline at end of file diff --git a/unilabos/workflow/from_python_script.py b/unilabos/workflow/from_python_script.py new file mode 100644 index 00000000..5a8ce38e --- /dev/null +++ b/unilabos/workflow/from_python_script.py @@ -0,0 +1,241 @@ +import ast +import json +from typing import Dict, List, Any, Tuple, Optional + +from .common import WorkflowGraph, RegistryAdapter + +Json = Dict[str, Any] + +# ---------------- Converter ---------------- + +class DeviceMethodConverter: + """ + - 字段统一:resource_name(原 device_class)、template_name(原 action_key) + - params 单层;inputs 使用 'params.' 前缀 + - SimpleGraph.add_workflow_node 负责变量连线与边 + """ + def __init__(self, device_registry: Optional[Dict[str, Any]] = None): + self.graph = WorkflowGraph() + self.variable_sources: Dict[str, Dict[str, Any]] = {} # var -> {node_id, output_name} + self.instance_to_resource: Dict[str, Optional[str]] = {} # 实例名 -> resource_name + self.node_id_counter: int = 0 + self.registry = RegistryAdapter(device_registry or {}) + + # ---- helpers ---- + def _new_node_id(self) -> int: + nid = self.node_id_counter + self.node_id_counter += 1 + return nid + + def _assign_targets(self, targets) -> List[str]: + names: List[str] = [] + import ast + if isinstance(targets, ast.Tuple): + for elt in targets.elts: + if isinstance(elt, ast.Name): + names.append(elt.id) + elif isinstance(targets, ast.Name): + names.append(targets.id) + return names + + def _extract_device_instantiation(self, node) -> Optional[Tuple[str, str]]: + import ast + if not isinstance(node.value, ast.Call): + return None + callee = node.value.func + if isinstance(callee, ast.Name): + class_name = callee.id + elif isinstance(callee, ast.Attribute) and isinstance(callee.value, ast.Name): + class_name = callee.attr + else: + return None + if isinstance(node.targets[0], ast.Name): + instance = node.targets[0].id + return instance, class_name + return None + + def _extract_call(self, call) -> Tuple[str, str, Dict[str, Any], str]: + import ast + owner_name, method_name, call_kind = "", "", "func" + if isinstance(call.func, ast.Attribute): + method_name = call.func.attr + if isinstance(call.func.value, ast.Name): + owner_name = call.func.value.id + call_kind = "instance" if owner_name in self.instance_to_resource else "class_or_module" + elif isinstance(call.func.value, ast.Attribute) and isinstance(call.func.value.value, ast.Name): + owner_name = call.func.value.attr + call_kind = "class_or_module" + elif isinstance(call.func, ast.Name): + method_name = call.func.id + call_kind = "func" + + def pack(node): + if isinstance(node, ast.Name): + return {"type": "variable", "value": node.id} + if isinstance(node, ast.Constant): + return {"type": "constant", "value": node.value} + if isinstance(node, ast.Dict): + return {"type": "dict", "value": self._parse_dict(node)} + if isinstance(node, ast.List): + return {"type": "list", "value": self._parse_list(node)} + return {"type": "raw", "value": ast.unparse(node) if hasattr(ast, "unparse") else str(node)} + + args: Dict[str, Any] = {} + pos: List[Any] = [] + for a in call.args: + pos.append(pack(a)) + for kw in call.keywords: + args[kw.arg] = pack(kw.value) + if pos: + args["_positional"] = pos + return owner_name, method_name, args, call_kind + + def _parse_dict(self, node) -> Dict[str, Any]: + import ast + out: Dict[str, Any] = {} + for k, v in zip(node.keys, node.values): + if isinstance(k, ast.Constant): + key = str(k.value) + if isinstance(v, ast.Name): + out[key] = f"var:{v.id}" + elif isinstance(v, ast.Constant): + out[key] = v.value + elif isinstance(v, ast.Dict): + out[key] = self._parse_dict(v) + elif isinstance(v, ast.List): + out[key] = self._parse_list(v) + return out + + def _parse_list(self, node) -> List[Any]: + import ast + out: List[Any] = [] + for elt in node.elts: + if isinstance(elt, ast.Name): + out.append(f"var:{elt.id}") + elif isinstance(elt, ast.Constant): + out.append(elt.value) + elif isinstance(elt, ast.Dict): + out.append(self._parse_dict(elt)) + elif isinstance(elt, ast.List): + out.append(self._parse_list(elt)) + return out + + def _normalize_var_tokens(self, x: Any) -> Any: + if isinstance(x, str) and x.startswith("var:"): + return {"__var__": x[4:]} + if isinstance(x, list): + return [self._normalize_var_tokens(i) for i in x] + if isinstance(x, dict): + return {k: self._normalize_var_tokens(v) for k, v in x.items()} + return x + + def _make_params_payload(self, resource_name: Optional[str], template_name: str, call_args: Dict[str, Any]) -> Dict[str, Any]: + input_keys = self.registry.get_action_input_keys(resource_name, template_name) if resource_name else [] + defaults = self.registry.get_action_goal_default(resource_name, template_name) if resource_name else {} + params: Dict[str, Any] = dict(defaults) + + def unpack(p): + t, v = p.get("type"), p.get("value") + if t == "variable": + return {"__var__": v} + if t == "dict": + return self._normalize_var_tokens(v) + if t == "list": + return self._normalize_var_tokens(v) + return v + + for k, p in call_args.items(): + if k == "_positional": + continue + params[k] = unpack(p) + + pos = call_args.get("_positional", []) + if pos: + if input_keys: + for i, p in enumerate(pos): + if i >= len(input_keys): + break + name = input_keys[i] + if name in params: + continue + params[name] = unpack(p) + else: + for i, p in enumerate(pos): + params[f"arg_{i}"] = unpack(p) + return params + + # ---- handlers ---- + def _on_assign(self, stmt): + import ast + inst = self._extract_device_instantiation(stmt) + if inst: + instance, code_class = inst + resource_name = self.registry.resolve_resource_by_classname(code_class) + self.instance_to_resource[instance] = resource_name + return + + if isinstance(stmt.value, ast.Call): + owner, method, call_args, kind = self._extract_call(stmt.value) + if kind == "instance": + device_key = owner + resource_name = self.instance_to_resource.get(owner) + else: + device_key = owner + resource_name = self.registry.resolve_resource_by_classname(owner) + + module = self.registry.get_device_module(resource_name) + params = self._make_params_payload(resource_name, method, call_args) + + nid = self._new_node_id() + self.graph.add_workflow_node( + nid, + device_key=device_key, + resource_name=resource_name, # ✅ + module=module, + template_name=method, # ✅ + params=params, + variable_sources=self.variable_sources, + add_ready_if_no_vars=True, + prev_node_id=(nid - 1) if nid > 0 else None, + ) + + out_vars = self._assign_targets(stmt.targets[0]) + for var in out_vars: + self.variable_sources[var] = {"node_id": nid, "output_name": "result"} + + def _on_expr(self, stmt): + import ast + if not isinstance(stmt.value, ast.Call): + return + owner, method, call_args, kind = self._extract_call(stmt.value) + if kind == "instance": + device_key = owner + resource_name = self.instance_to_resource.get(owner) + else: + device_key = owner + resource_name = self.registry.resolve_resource_by_classname(owner) + + module = self.registry.get_device_module(resource_name) + params = self._make_params_payload(resource_name, method, call_args) + + nid = self._new_node_id() + self.graph.add_workflow_node( + nid, + device_key=device_key, + resource_name=resource_name, # ✅ + module=module, + template_name=method, # ✅ + params=params, + variable_sources=self.variable_sources, + add_ready_if_no_vars=True, + prev_node_id=(nid - 1) if nid > 0 else None, + ) + + def convert(self, python_code: str): + tree = ast.parse(python_code) + for stmt in tree.body: + if isinstance(stmt, ast.Assign): + self._on_assign(stmt) + elif isinstance(stmt, ast.Expr): + self._on_expr(stmt) + return self diff --git a/unilabos/workflow/from_xdl.py b/unilabos/workflow/from_xdl.py new file mode 100644 index 00000000..1041f9ad --- /dev/null +++ b/unilabos/workflow/from_xdl.py @@ -0,0 +1,131 @@ +from typing import List, Any, Dict +import xml.etree.ElementTree as ET + + +def convert_to_type(val: str) -> Any: + """将字符串值转换为适当的数据类型""" + if val == "True": + return True + if val == "False": + return False + if val == "?": + return None + if val.endswith(" g"): + return float(val.split(" ")[0]) + if val.endswith("mg"): + return float(val.split("mg")[0]) + elif val.endswith("mmol"): + return float(val.split("mmol")[0]) / 1000 + elif val.endswith("mol"): + return float(val.split("mol")[0]) + elif val.endswith("ml"): + return float(val.split("ml")[0]) + elif val.endswith("RPM"): + return float(val.split("RPM")[0]) + elif val.endswith(" °C"): + return float(val.split(" ")[0]) + elif val.endswith(" %"): + return float(val.split(" ")[0]) + return val + + +def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]: + """展平嵌套的XDL程序结构""" + flattened_operations = [] + TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"] + + def extract_operations(element: ET.Element): + if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]: + if element.tag not in TEMP_UNSUPPORTED_PROTOCOL: + flattened_operations.append(element) + + for child in element: + extract_operations(child) + + for child in procedure_elem: + extract_operations(child) + + return flattened_operations + + +def parse_xdl_content(xdl_content: str) -> tuple: + """解析XDL内容""" + try: + xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable()) + root = ET.fromstring(xdl_content_cleaned) + + synthesis_elem = root.find("Synthesis") + if synthesis_elem is None: + return None, None, None + + # 解析硬件组件 + hardware_elem = synthesis_elem.find("Hardware") + hardware = [] + if hardware_elem is not None: + hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")] + + # 解析试剂 + reagents_elem = synthesis_elem.find("Reagents") + reagents = [] + if reagents_elem is not None: + reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")] + + # 解析程序 + procedure_elem = synthesis_elem.find("Procedure") + if procedure_elem is None: + return None, None, None + + flattened_operations = flatten_xdl_procedure(procedure_elem) + return hardware, reagents, flattened_operations + + except ET.ParseError as e: + raise ValueError(f"Invalid XDL format: {e}") + + +def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]: + """ + 将XDL XML格式转换为标准的字典格式 + + Args: + xdl_content: XDL XML内容 + + Returns: + 转换结果,包含步骤和器材信息 + """ + try: + hardware, reagents, flattened_operations = parse_xdl_content(xdl_content) + if hardware is None: + return {"error": "Failed to parse XDL content", "success": False} + + # 将XDL元素转换为字典格式 + steps_data = [] + for elem in flattened_operations: + # 转换参数类型 + parameters = {} + for key, val in elem.attrib.items(): + converted_val = convert_to_type(val) + if converted_val is not None: + parameters[key] = converted_val + + step_dict = { + "operation": elem.tag, + "parameters": parameters, + "description": elem.get("purpose", f"Operation: {elem.tag}"), + } + steps_data.append(step_dict) + + # 合并硬件和试剂为统一的labware_info格式 + labware_data = [] + labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware) + labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents) + + return { + "success": True, + "steps": steps_data, + "labware": labware_data, + "message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.", + } + + except Exception as e: + error_msg = f"XDL conversion failed: {str(e)}" + return {"error": error_msg, "success": False}