diff --git a/scripts/workflow.py b/scripts/workflow.py new file mode 100644 index 00000000..6db3c13e --- /dev/null +++ b/scripts/workflow.py @@ -0,0 +1,689 @@ +import json +import logging +import traceback +import uuid +import xml.etree.ElementTree as ET +from typing import Any, Dict, List + +import networkx as nx +import matplotlib.pyplot as plt +import requests + +logger = logging.getLogger(__name__) + + +class SimpleGraph: + """简单的有向图实现,用于构建工作流图""" + + def __init__(self): + self.nodes = {} + self.edges = [] + + def add_node(self, node_id, **attrs): + """添加节点""" + self.nodes[node_id] = attrs + + def add_edge(self, source, target, **attrs): + """添加边""" + edge = {"source": source, "target": target, **attrs} + self.edges.append(edge) + + def to_dict(self): + """转换为工作流图格式""" + nodes_list = [] + for node_id, attrs in self.nodes.items(): + node_attrs = attrs.copy() + params = node_attrs.pop("parameters", {}) or {} + node_attrs.update(params) + nodes_list.append({"id": node_id, **node_attrs}) + + return { + "directed": True, + "multigraph": False, + "graph": {}, + "nodes": nodes_list, + "links": self.edges, + } + + +def extract_json_from_markdown(text: str) -> str: + """从markdown代码块中提取JSON""" + text = text.strip() + if text.startswith("```json\n"): + text = text[8:] + if text.startswith("```\n"): + text = text[4:] + if text.endswith("\n```"): + text = text[:-4] + return text + + +def convert_to_type(val: str) -> Any: + """将字符串值转换为适当的数据类型""" + if val == "True": + return True + if val == "False": + return False + if val == "?": + return None + if val.endswith(" g"): + return float(val.split(" ")[0]) + if val.endswith("mg"): + return float(val.split("mg")[0]) + elif val.endswith("mmol"): + return float(val.split("mmol")[0]) / 1000 + elif val.endswith("mol"): + return float(val.split("mol")[0]) + elif val.endswith("ml"): + return float(val.split("ml")[0]) + elif val.endswith("RPM"): + return float(val.split("RPM")[0]) + elif val.endswith(" °C"): + return float(val.split(" ")[0]) + elif val.endswith(" %"): + return float(val.split(" ")[0]) + return val + + +def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """统一的数据重构函数,根据操作类型自动选择模板""" + refactored_data = [] + + # 定义操作映射,包含生物实验和有机化学的所有操作 + OPERATION_MAPPING = { + # 生物实验操作 + "transfer_liquid": "SynBioFactory-liquid_handler.transfer_liquid", + "transfer": "SynBioFactory-liquid_handler.biomek-transfer", + "incubation": "SynBioFactory-liquid_handler.biomek-incubation", + "move_labware": "SynBioFactory-liquid_handler.biomek-move_labware", + "oscillation": "SynBioFactory-liquid_handler.biomek-oscillation", + # 有机化学操作 + "HeatChillToTemp": "SynBioFactory-workstation-HeatChillProtocol", + "StopHeatChill": "SynBioFactory-workstation-HeatChillStopProtocol", + "StartHeatChill": "SynBioFactory-workstation-HeatChillStartProtocol", + "HeatChill": "SynBioFactory-workstation-HeatChillProtocol", + "Dissolve": "SynBioFactory-workstation-DissolveProtocol", + "Transfer": "SynBioFactory-workstation-TransferProtocol", + "Evaporate": "SynBioFactory-workstation-EvaporateProtocol", + "Recrystallize": "SynBioFactory-workstation-RecrystallizeProtocol", + "Filter": "SynBioFactory-workstation-FilterProtocol", + "Dry": "SynBioFactory-workstation-DryProtocol", + "Add": "SynBioFactory-workstation-AddProtocol", + } + + UNSUPPORTED_OPERATIONS = ["Purge", "Wait", "Stir", "ResetHandling"] + + for step in data: + operation = step.get("action") + if not operation or operation in UNSUPPORTED_OPERATIONS: + continue + + # 处理重复操作 + if operation == "Repeat": + times = step.get("times", step.get("parameters", {}).get("times", 1)) + sub_steps = step.get("steps", step.get("parameters", {}).get("steps", [])) + for i in range(int(times)): + sub_data = refactor_data(sub_steps) + refactored_data.extend(sub_data) + continue + + # 获取模板名称 + template = OPERATION_MAPPING.get(operation) + if not template: + # 自动推断模板类型 + if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]: + template = f"SynBioFactory-liquid_handler.biomek-{operation}" + else: + template = f"SynBioFactory-workstation-{operation}Protocol" + + # 创建步骤数据 + step_data = { + "template": template, + "description": step.get("description", step.get("purpose", f"{operation} operation")), + "lab_node_type": "Device", + "parameters": step.get("parameters", step.get("action_args", {})), + } + refactored_data.append(step_data) + + return refactored_data + + +def build_protocol_graph( + labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str +) -> SimpleGraph: + """统一的协议图构建函数,根据设备类型自动选择构建逻辑""" + G = SimpleGraph() + resource_last_writer = {} + LAB_NAME = "SynBioFactory" + + protocol_steps = refactor_data(protocol_steps) + + # 检查协议步骤中的模板来判断协议类型 + has_biomek_template = any("biomek" in step.get("template", "") for step in protocol_steps) + + if has_biomek_template: + # 生物实验协议图构建 + for labware_id, labware in labware_info.items(): + node_id = str(uuid.uuid4()) + + labware_attrs = labware.copy() + labware_id = labware_attrs.pop("id", labware_attrs.get("name", f"labware_{uuid.uuid4()}")) + labware_attrs["description"] = labware_id + labware_attrs["lab_node_type"] = ( + "Reagent" if "Plate" in str(labware_id) else "Labware" if "Rack" in str(labware_id) else "Sample" + ) + labware_attrs["device_id"] = workstation_name + + G.add_node(node_id, template=f"{LAB_NAME}-host_node-create_resource", **labware_attrs) + resource_last_writer[labware_id] = f"{node_id}:labware" + + # 处理协议步骤 + for i, step in enumerate(protocol_steps): + node_id = str(uuid.uuid4()) + G.add_node(node_id, **step) + + # 添加控制流边 + G.add_edge(prev_node, node_id, source_port="ready", target_port="ready") + prev_node = node_id + + # 处理物料流 + params = step.get("parameters", {}) + if "sources" in params and params["sources"] in resource_last_writer: + source_node, source_port = resource_last_writer[params["sources"]].split(":") + G.add_edge(source_node, node_id, source_port=source_port, target_port="labware") + + if "targets" in params: + resource_last_writer[params["targets"]] = f"{node_id}:labware" + + # 添加协议结束节点 + end_id = str(uuid.uuid4()) + G.add_node(end_id, template=f"{LAB_NAME}-liquid_handler.biomek-run_protocol") + G.add_edge(prev_node, end_id, source_port="ready", target_port="ready") + + else: + # 有机化学协议图构建 + WORKSTATION_ID = workstation_name + + # 为所有labware创建资源节点 + for item_id, item in labware_info.items(): + # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") + node_id = str(uuid.uuid4()) + + # 判断节点类型 + if item.get("type") == "hardware" or "reactor" in str(item_id).lower(): + if "reactor" not in str(item_id).lower(): + continue + lab_node_type = "Sample" + description = f"Prepare Reactor: {item_id}" + liquid_type = [] + liquid_volume = [] + else: + lab_node_type = "Reagent" + description = f"Add Reagent to Flask: {item_id}" + liquid_type = [item_id] + liquid_volume = [1e5] + + G.add_node( + node_id, + template=f"{LAB_NAME}-host_node-create_resource", + description=description, + lab_node_type=lab_node_type, + res_id=item_id, + device_id=WORKSTATION_ID, + class_name="container", + parent=WORKSTATION_ID, + bind_locations={"x": 0.0, "y": 0.0, "z": 0.0}, + liquid_input_slot=[-1], + liquid_type=liquid_type, + liquid_volume=liquid_volume, + slot_on_deck="", + role=item.get("role", ""), + ) + resource_last_writer[item_id] = f"{node_id}:labware" + + last_control_node_id = None + + # 处理协议步骤 + for step in protocol_steps: + node_id = str(uuid.uuid4()) + G.add_node(node_id, **step) + + # 控制流 + if last_control_node_id is not None: + G.add_edge(last_control_node_id, node_id, source_port="ready", target_port="ready") + last_control_node_id = node_id + + # 物料流 + params = step.get("parameters", {}) + input_resources = { + "Vessel": params.get("vessel"), + "ToVessel": params.get("to_vessel"), + "FromVessel": params.get("from_vessel"), + "reagent": params.get("reagent"), + "solvent": params.get("solvent"), + "compound": params.get("compound"), + "sources": params.get("sources"), + "targets": params.get("targets"), + } + + for target_port, resource_name in input_resources.items(): + if resource_name and resource_name in resource_last_writer: + source_node, source_port = resource_last_writer[resource_name].split(":") + G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) + + output_resources = { + "VesselOut": params.get("vessel"), + "FromVesselOut": params.get("from_vessel"), + "ToVesselOut": params.get("to_vessel"), + "FiltrateOut": params.get("filtrate_vessel"), + "reagent": params.get("reagent"), + "solvent": params.get("solvent"), + "compound": params.get("compound"), + "sources_out": params.get("sources"), + "targets_out": params.get("targets"), + } + + for source_port, resource_name in output_resources.items(): + if resource_name: + resource_last_writer[resource_name] = f"{node_id}:{source_port}" + + return G + + +def draw_protocol_graph(protocol_graph: SimpleGraph, output_path: str): + """ + (辅助功能) 使用 networkx 和 matplotlib 绘制协议工作流图,用于可视化。 + """ + if not protocol_graph: + print("Cannot draw graph: Graph object is empty.") + return + + G = nx.DiGraph() + + for node_id, attrs in protocol_graph.nodes.items(): + label = attrs.get("description", attrs.get("template", node_id[:8])) + G.add_node(node_id, label=label, **attrs) + + for edge in protocol_graph.edges: + G.add_edge(edge["source"], edge["target"]) + + plt.figure(figsize=(20, 15)) + try: + pos = nx.nx_agraph.graphviz_layout(G, prog="dot") + except Exception: + pos = nx.shell_layout(G) # Fallback layout + + node_labels = {node: data["label"] for node, data in G.nodes(data=True)} + nx.draw( + G, + pos, + with_labels=False, + node_size=2500, + node_color="skyblue", + node_shape="o", + edge_color="gray", + width=1.5, + arrowsize=15, + ) + nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=8, font_weight="bold") + + plt.title("Chemical Protocol Workflow Graph", size=15) + plt.savefig(output_path, dpi=300, bbox_inches="tight") + plt.close() + print(f" - Visualization saved to '{output_path}'") + + +from networkx.drawing.nx_agraph import to_agraph +import re + +COMPASS = {"n","e","s","w","ne","nw","se","sw","c"} + +def _is_compass(port: str) -> bool: + return isinstance(port, str) and port.lower() in COMPASS + +def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"): + """ + 使用 Graphviz 端口语法绘制协议工作流图。 + - 若边上的 source_port/target_port 是 compass(n/e/s/w/...),直接用 compass。 + - 否则自动为节点创建 record 形状并定义命名端口 。 + 最终由 PyGraphviz 渲染并输出到 output_path(后缀决定格式,如 .png/.svg/.pdf)。 + """ + if not protocol_graph: + print("Cannot draw graph: Graph object is empty.") + return + + # 1) 先用 networkx 搭建有向图,保留端口属性 + G = nx.DiGraph() + for node_id, attrs in protocol_graph.nodes.items(): + label = attrs.get("description", attrs.get("template", node_id[:8])) + # 保留一个干净的“中心标签”,用于放在 record 的中间槽 + G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)}) + + edges_data = [] + in_ports_by_node = {} # 收集命名输入端口 + out_ports_by_node = {} # 收集命名输出端口 + + for edge in protocol_graph.edges: + u = edge["source"] + v = edge["target"] + sp = edge.get("source_port") + tp = edge.get("target_port") + + # 记录到图里(保留原始端口信息) + G.add_edge(u, v, source_port=sp, target_port=tp) + edges_data.append((u, v, sp, tp)) + + # 如果不是 compass,就按“命名端口”先归类,等会儿给节点造 record + if sp and not _is_compass(sp): + out_ports_by_node.setdefault(u, set()).add(str(sp)) + if tp and not _is_compass(tp): + in_ports_by_node.setdefault(v, set()).add(str(tp)) + + # 2) 转为 AGraph,使用 Graphviz 渲染 + A = to_agraph(G) + A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10") + A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica") + A.edge_attr.update(arrowsize="0.8", color="#666666") + + # 3) 为需要命名端口的节点设置 record 形状与 label + # 左列 = 输入端口;中间 = 核心标签;右列 = 输出端口 + for n in A.nodes(): + node = A.get_node(n) + core = G.nodes[n].get("_core_label", n) + + in_ports = sorted(in_ports_by_node.get(n, [])) + out_ports = sorted(out_ports_by_node.get(n, [])) + + # 如果该节点涉及命名端口,则用 record;否则保留原 box + if in_ports or out_ports: + def port_fields(ports): + if not ports: + return " " # 必须留一个空槽占位 + # 每个端口一个小格子,

name + return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports) + + left = port_fields(in_ports) + right = port_fields(out_ports) + + # 三栏:左(入) | 中(节点名) | 右(出) + record_label = f"{{ {left} | {core} | {right} }}" + node.attr.update(shape="record", label=record_label) + else: + # 没有命名端口:普通盒子,显示核心标签 + node.attr.update(label=str(core)) + + # 4) 给边设置 headport / tailport + # - 若端口为 compass:直接用 compass(e.g., headport="e") + # - 若端口为命名端口:使用在 record 中定义的 名(同名即可) + for (u, v, sp, tp) in edges_data: + e = A.get_edge(u, v) + + # Graphviz 属性:tail 是源,head 是目标 + if sp: + if _is_compass(sp): + e.attr["tailport"] = sp.lower() + else: + # 与 record label 中 名一致;特殊字符已在 label 中做了清洗 + e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp)) + + if tp: + if _is_compass(tp): + e.attr["headport"] = tp.lower() + else: + e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp)) + + # 可选:若想让边更贴边缘,可设置 constraint/spline 等 + # e.attr["arrowhead"] = "vee" + + # 5) 输出 + A.draw(output_path, prog="dot") + print(f" - Port-aware workflow rendered to '{output_path}'") + + +def flatten_xdl_procedure(procedure_elem: ET.Element) -> List[ET.Element]: + """展平嵌套的XDL程序结构""" + flattened_operations = [] + TEMP_UNSUPPORTED_PROTOCOL = ["Purge", "Wait", "Stir", "ResetHandling"] + + def extract_operations(element: ET.Element): + if element.tag not in ["Prep", "Reaction", "Workup", "Purification", "Procedure"]: + if element.tag not in TEMP_UNSUPPORTED_PROTOCOL: + flattened_operations.append(element) + + for child in element: + extract_operations(child) + + for child in procedure_elem: + extract_operations(child) + + return flattened_operations + + +def parse_xdl_content(xdl_content: str) -> tuple: + """解析XDL内容""" + try: + xdl_content_cleaned = "".join(c for c in xdl_content if c.isprintable()) + root = ET.fromstring(xdl_content_cleaned) + + synthesis_elem = root.find("Synthesis") + if synthesis_elem is None: + return None, None, None + + # 解析硬件组件 + hardware_elem = synthesis_elem.find("Hardware") + hardware = [] + if hardware_elem is not None: + hardware = [{"id": c.get("id"), "type": c.get("type")} for c in hardware_elem.findall("Component")] + + # 解析试剂 + reagents_elem = synthesis_elem.find("Reagents") + reagents = [] + if reagents_elem is not None: + reagents = [{"name": r.get("name"), "role": r.get("role", "")} for r in reagents_elem.findall("Reagent")] + + # 解析程序 + procedure_elem = synthesis_elem.find("Procedure") + if procedure_elem is None: + return None, None, None + + flattened_operations = flatten_xdl_procedure(procedure_elem) + return hardware, reagents, flattened_operations + + except ET.ParseError as e: + raise ValueError(f"Invalid XDL format: {e}") + + +def convert_xdl_to_dict(xdl_content: str) -> Dict[str, Any]: + """ + 将XDL XML格式转换为标准的字典格式 + + Args: + xdl_content: XDL XML内容 + + Returns: + 转换结果,包含步骤和器材信息 + """ + try: + hardware, reagents, flattened_operations = parse_xdl_content(xdl_content) + if hardware is None: + return {"error": "Failed to parse XDL content", "success": False} + + # 将XDL元素转换为字典格式 + steps_data = [] + for elem in flattened_operations: + # 转换参数类型 + parameters = {} + for key, val in elem.attrib.items(): + converted_val = convert_to_type(val) + if converted_val is not None: + parameters[key] = converted_val + + step_dict = { + "operation": elem.tag, + "parameters": parameters, + "description": elem.get("purpose", f"Operation: {elem.tag}"), + } + steps_data.append(step_dict) + + # 合并硬件和试剂为统一的labware_info格式 + labware_data = [] + labware_data.extend({"id": hw["id"], "type": "hardware", **hw} for hw in hardware) + labware_data.extend({"name": reagent["name"], "type": "reagent", **reagent} for reagent in reagents) + + return { + "success": True, + "steps": steps_data, + "labware": labware_data, + "message": f"Successfully converted XDL to dict format. Found {len(steps_data)} steps and {len(labware_data)} labware items.", + } + + except Exception as e: + error_msg = f"XDL conversion failed: {str(e)}" + logger.error(error_msg) + return {"error": error_msg, "success": False} + + +def create_workflow( + steps_info: str, + labware_info: str, + workflow_name: str = "Generated Workflow", + workstation_name: str = "workstation", + workflow_description: str = "Auto-generated workflow from protocol", +) -> Dict[str, Any]: + """ + 创建工作流,输入数据已经是统一的字典格式 + + Args: + steps_info: 步骤信息 (JSON字符串,已经是list of dict格式) + labware_info: 实验器材和试剂信息 (JSON字符串,已经是list of dict格式) + workflow_name: 工作流名称 + workflow_description: 工作流描述 + + Returns: + 创建结果,包含工作流UUID和详细信息 + """ + try: + # 直接解析JSON数据 + steps_info_clean = extract_json_from_markdown(steps_info) + labware_info_clean = extract_json_from_markdown(labware_info) + + steps_data = json.loads(steps_info_clean) + labware_data = json.loads(labware_info_clean) + + # 统一处理所有数据 + protocol_graph = build_protocol_graph(labware_data, steps_data, workstation_name=workstation_name) + + # 检测协议类型(用于标签) + protocol_type = "bio" if any("biomek" in step.get("template", "") for step in refactored_steps) else "organic" + + # 转换为工作流格式 + data = protocol_graph.to_dict() + + # 转换节点格式 + for i, node in enumerate(data["nodes"]): + description = node.get("description", "") + onode = { + "template": node.pop("template"), + "id": node["id"], + "lab_node_type": node.get("lab_node_type", "Device"), + "name": description or f"Node {i + 1}", + "params": {"default": node}, + "handles": {}, + } + + # 处理边连接 + for edge in data["links"]: + if edge["source"] == node["id"]: + source_port = edge.get("source_port", "output") + if source_port not in onode["handles"]: + onode["handles"][source_port] = {"type": "source"} + + if edge["target"] == node["id"]: + target_port = edge.get("target_port", "input") + if target_port not in onode["handles"]: + onode["handles"][target_port] = {"type": "target"} + + data["nodes"][i] = onode + + # 发送到API创建工作流 + api_secret = configs.Lab.Key + if not api_secret: + return {"error": "API SecretKey is not configured", "success": False} + + # Step 1: 创建工作流 + workflow_url = f"{configs.Lab.Api}/api/v1/workflow/" + headers = { + "Content-Type": "application/json", + } + params = {"secret_key": api_secret} + + graph_data = {"name": workflow_name, **data} + + logger.info(f"Creating workflow: {workflow_name}") + response = requests.post( + workflow_url, params=params, json=graph_data, headers=headers, timeout=configs.Lab.Timeout + ) + response.raise_for_status() + + workflow_info = response.json() + + if workflow_info.get("code") != 0: + error_msg = f"API returned an error: {workflow_info.get('msg', 'Unknown Error')}" + logger.error(error_msg) + return {"error": error_msg, "success": False} + + workflow_uuid = workflow_info.get("data", {}).get("uuid") + if not workflow_uuid: + return {"error": "Failed to get workflow UUID from response", "success": False} + + # Step 2: 添加到模板库(可选) + try: + library_url = f"{configs.Lab.Api}/api/flociety/vs/workflows/library/" + lib_payload = { + "workflow_uuid": workflow_uuid, + "title": workflow_name, + "description": workflow_description, + "labels": [protocol_type.title(), "Auto-generated"], + } + + library_response = requests.post( + library_url, params=params, json=lib_payload, headers=headers, timeout=configs.Lab.Timeout + ) + library_response.raise_for_status() + + library_info = library_response.json() + logger.info(f"Workflow added to library: {library_info}") + + return { + "success": True, + "workflow_uuid": workflow_uuid, + "workflow_info": workflow_info.get("data"), + "library_info": library_info.get("data"), + "protocol_type": protocol_type, + "message": f"Workflow '{workflow_name}' created successfully", + } + + except Exception as e: + # 即使添加到库失败,工作流创建仍然成功 + logger.warning(f"Failed to add workflow to library: {str(e)}") + return { + "success": True, + "workflow_uuid": workflow_uuid, + "workflow_info": workflow_info.get("data"), + "protocol_type": protocol_type, + "message": f"Workflow '{workflow_name}' created successfully (library addition failed)", + } + + except requests.exceptions.RequestException as e: + error_msg = f"Network error when calling API: {str(e)}" + logger.error(error_msg) + return {"error": error_msg, "success": False} + except json.JSONDecodeError as e: + error_msg = f"JSON parsing error: {str(e)}" + logger.error(error_msg) + return {"error": error_msg, "success": False} + except Exception as e: + error_msg = f"An unexpected error occurred: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + return {"error": error_msg, "success": False} diff --git a/test/workflow/example_bio.json b/test/workflow/example_bio.json new file mode 100644 index 00000000..d0f0d7a3 --- /dev/null +++ b/test/workflow/example_bio.json @@ -0,0 +1,186 @@ +{ + "workflow": [ + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_1", + "targets": "Liquid_2", + "asp_vol": 66.0, + "dis_vol": 66.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_2", + "targets": "Liquid_3", + "asp_vol": 58.0, + "dis_vol": 96.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_4", + "targets": "Liquid_2", + "asp_vol": 85.0, + "dis_vol": 170.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_4", + "targets": "Liquid_2", + "asp_vol": 63.333333333333336, + "dis_vol": 170.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_2", + "targets": "Liquid_3", + "asp_vol": 72.0, + "dis_vol": 150.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_4", + "targets": "Liquid_2", + "asp_vol": 85.0, + "dis_vol": 170.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_4", + "targets": "Liquid_2", + "asp_vol": 63.333333333333336, + "dis_vol": 170.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_2", + "targets": "Liquid_3", + "asp_vol": 72.0, + "dis_vol": 150.0, + "asp_flow_rate": 94.0, + "dis_flow_rate": 94.0 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_2", + "targets": "Liquid_3", + "asp_vol": 20.0, + "dis_vol": 20.0, + "asp_flow_rate": 7.6, + "dis_flow_rate": 7.6 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_5", + "targets": "Liquid_2", + "asp_vol": 6.0, + "dis_vol": 12.0, + "asp_flow_rate": 7.6, + "dis_flow_rate": 7.6 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_5", + "targets": "Liquid_2", + "asp_vol": 10.666666666666666, + "dis_vol": 12.0, + "asp_flow_rate": 7.599999999999999, + "dis_flow_rate": 7.6 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "Liquid_2", + "targets": "Liquid_6", + "asp_vol": 12.0, + "dis_vol": 10.0, + "asp_flow_rate": 7.6, + "dis_flow_rate": 7.6 + } + } + ], + "reagent": { + "Liquid_6": { + "slot": 1, + "well": [ + "A2" + ], + "labware": "elution plate" + }, + "Liquid_1": { + "slot": 2, + "well": [ + "A1", + "A2", + "A4" + ], + "labware": "reagent reservoir" + }, + "Liquid_4": { + "slot": 2, + "well": [ + "A1", + "A2", + "A4" + ], + "labware": "reagent reservoir" + }, + "Liquid_5": { + "slot": 2, + "well": [ + "A1", + "A2", + "A4" + ], + "labware": "reagent reservoir" + }, + "Liquid_2": { + "slot": 4, + "well": [ + "A2" + ], + "labware": "TAG1 plate on Magnetic Module GEN2" + }, + "Liquid_3": { + "slot": 12, + "well": [ + "A1" + ], + "labware": "Opentrons Fixed Trash" + } + } +} \ No newline at end of file diff --git a/test/workflow/merge_workflow.py b/test/workflow/merge_workflow.py new file mode 100644 index 00000000..f42783c5 --- /dev/null +++ b/test/workflow/merge_workflow.py @@ -0,0 +1,14 @@ +import pytest +import json +from scripts.workflow import build_protocol_graph, draw_protocol_graph, draw_protocol_graph_with_ports + + +@pytest.mark.parametrize("protocol_name", [ + "example_bio", + # "bioyond_materials_liquidhandling_1", +]) +def test_build_protocol_graph(protocol_name): + d = json.load(open(f"{protocol_name}.json")) + graph = build_protocol_graph(labware_info=d["reagent"], protocol_steps=d["workflow"], workstation_name="PRCXi") + draw_protocol_graph_with_ports(graph, "graph.png") + print(graph) \ No newline at end of file