From b0da1492520fb5a8e015a59093603bc7d99f0fa8 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:18:45 +0800 Subject: [PATCH] fix upload workflow json --- tests/workflow/test.json | 213 +++++++++++ unilabos/app/web/client.py | 2 +- unilabos/workflow/common.py | 107 ++++-- unilabos/workflow/convert_from_json.py | 259 ++++++------- .../legacy/convert_from_json_legacy.py | 356 ++++++++++++++++++ 5 files changed, 748 insertions(+), 189 deletions(-) create mode 100644 tests/workflow/test.json create mode 100644 unilabos/workflow/legacy/convert_from_json_legacy.py diff --git a/tests/workflow/test.json b/tests/workflow/test.json new file mode 100644 index 0000000..8fc6449 --- /dev/null +++ b/tests/workflow/test.json @@ -0,0 +1,213 @@ +{ + "workflow": [ + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines", + "targets": "Liquid_1", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines", + "targets": "Liquid_2", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines", + "targets": "Liquid_3", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines_2", + "targets": "Liquid_4", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines_2", + "targets": "Liquid_5", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines_2", + "targets": "Liquid_6", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines_3", + "targets": "dest_set", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines_3", + "targets": "dest_set_2", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + }, + { + "action": "transfer_liquid", + "action_args": { + "sources": "cell_lines_3", + "targets": "dest_set_3", + "asp_vol": 100.0, + "dis_vol": 74.75, + "asp_flow_rate": 94.0, + "dis_flow_rate": 95.5 + } + } + ], + "reagent": { + "Liquid_1": { + "slot": 1, + "well": [ + "A4", + "A7", + "A10" + ], + "labware": "rep 1" + }, + "Liquid_4": { + "slot": 1, + "well": [ + "A4", + "A7", + "A10" + ], + "labware": "rep 1" + }, + "dest_set": { + "slot": 1, + "well": [ + "A4", + "A7", + "A10" + ], + "labware": "rep 1" + }, + "Liquid_2": { + "slot": 2, + "well": [ + "A3", + "A5", + "A8" + ], + "labware": "rep 2" + }, + "Liquid_5": { + "slot": 2, + "well": [ + "A3", + "A5", + "A8" + ], + "labware": "rep 2" + }, + "dest_set_2": { + "slot": 2, + "well": [ + "A3", + "A5", + "A8" + ], + "labware": "rep 2" + }, + "Liquid_3": { + "slot": 3, + "well": [ + "A4", + "A6", + "A10" + ], + "labware": "rep 3" + }, + "Liquid_6": { + "slot": 3, + "well": [ + "A4", + "A6", + "A10" + ], + "labware": "rep 3" + }, + "dest_set_3": { + "slot": 3, + "well": [ + "A4", + "A6", + "A10" + ], + "labware": "rep 3" + }, + "cell_lines": { + "slot": 4, + "well": [ + "A1", + "A3", + "A5" + ], + "labware": "DRUG + YOYO-MEDIA" + }, + "cell_lines_2": { + "slot": 4, + "well": [ + "A1", + "A3", + "A5" + ], + "labware": "DRUG + YOYO-MEDIA" + }, + "cell_lines_3": { + "slot": 4, + "well": [ + "A1", + "A3", + "A5" + ], + "labware": "DRUG + YOYO-MEDIA" + } + } +} \ No newline at end of file diff --git a/unilabos/app/web/client.py b/unilabos/app/web/client.py index 64a9418..7cefc8e 100644 --- a/unilabos/app/web/client.py +++ b/unilabos/app/web/client.py @@ -361,7 +361,7 @@ class HTTPClient: """ # target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取 payload = { - "target_lab_uuid": "28c38bb0-63f6-4352-b0d8-b5b8eb1766d5", + "target_lab_uuid": "cf44e98c-7f3e-4175-b526-1fa338b43f65", "name": name, "data": { "workflow_uuid": workflow_uuid, diff --git a/unilabos/workflow/common.py b/unilabos/workflow/common.py index 9bff049..c1cf002 100644 --- a/unilabos/workflow/common.py +++ b/unilabos/workflow/common.py @@ -8,6 +8,20 @@ from typing import Dict, List, Any, Tuple, Optional Json = Dict[str, Any] + +# ==================== 默认配置 ==================== + +# create_resource 节点默认参数 +CREATE_RESOURCE_DEFAULTS = { + "device_id": "/PRCXI", + "parent": "/PRCXI/PRCXI_Deck", + "class_name": "PRCXI_BioER_96_wellplate", +} + +# 默认液体体积 (uL) +DEFAULT_LIQUID_VOLUME = 1e5 + + # ---------------- Graph ---------------- @@ -228,7 +242,7 @@ def refactor_data( def build_protocol_graph( - labware_info: List[Dict[str, Any]], + labware_info: Dict[str, Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str, action_resource_mapping: Optional[Dict[str, str]] = None, @@ -236,7 +250,7 @@ def build_protocol_graph( """统一的协议图构建函数,根据设备类型自动选择构建逻辑 Args: - labware_info: labware 信息字典 + labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...} protocol_steps: 协议步骤列表 workstation_name: 工作站名称 action_resource_mapping: action 到 resource_name 的映射字典,可选 @@ -251,13 +265,21 @@ def build_protocol_graph( # 为所有labware创建资源节点 res_index = 0 for labware_id, item in labware_info.items(): - # item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}") node_id = str(uuid.uuid4()) + # res_id 不能有空格,替换为下划线 + res_id = str(labware_id).replace(" ", "_") + + # 从 reagent 数据中获取 well 信息 + wells = item.get("well", []) + slot = str(item.get("slot", "")) # slot_on_deck 是字符串 + well_count = len(wells) if wells else 1 + # 判断节点类型 if "Rack" in str(labware_id) or "Tip" in str(labware_id): lab_node_type = "Labware" description = f"Prepare Labware: {labware_id}" + liquid_input_slot = wells if wells else [-1] liquid_type = [] liquid_volume = [] elif item.get("type") == "hardware" or "reactor" in str(labware_id).lower(): @@ -265,13 +287,16 @@ def build_protocol_graph( continue lab_node_type = "Sample" description = f"Prepare Reactor: {labware_id}" + liquid_input_slot = wells if wells else [-1] liquid_type = [] liquid_volume = [] else: lab_node_type = "Reagent" description = f"Add Reagent to Flask: {labware_id}" - liquid_type = [labware_id] - liquid_volume = [1e5] + # liquid_input_slot, liquid_type, liquid_volume 数量与 wells 保持一致 + liquid_input_slot = wells if wells else [-1] + liquid_type = [res_id] * well_count + liquid_volume = [DEFAULT_LIQUID_VOLUME] * well_count res_index += 1 G.add_node( @@ -283,21 +308,46 @@ def build_protocol_graph( lab_node_type=lab_node_type, footer="create_resource-host_node", param={ - "res_id": labware_id, - "device_id": WORKSTATION_ID, - "class_name": "container", - "parent": WORKSTATION_ID, + "res_id": res_id, + "device_id": CREATE_RESOURCE_DEFAULTS["device_id"], + "class_name": CREATE_RESOURCE_DEFAULTS["class_name"], + "parent": CREATE_RESOURCE_DEFAULTS["parent"], "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, - "liquid_input_slot": [-1], + "liquid_input_slot": liquid_input_slot, "liquid_type": liquid_type, "liquid_volume": liquid_volume, - "slot_on_deck": "", + "slot_on_deck": slot, }, ) - resource_last_writer[labware_id] = f"{node_id}:labware" + # create_resource 节点输出 liquid_slots,用于连接 transfer_liquid 的 sources/targets + resource_last_writer[labware_id] = f"{node_id}:liquid_slots" last_control_node_id = None + # 端口名称映射:JSON 字段名 -> 实际 handle key + INPUT_PORT_MAPPING = { + "sources": "sources_identifier", + "targets": "targets_identifier", + "vessel": "vessel", + "to_vessel": "to_vessel", + "from_vessel": "from_vessel", + "reagent": "reagent", + "solvent": "solvent", + "compound": "compound", + } + + OUTPUT_PORT_MAPPING = { + "sources": "sources_out", # 输出端口是 xxx_out + "targets": "targets_out", # 输出端口是 xxx_out + "vessel": "vessel_out", + "to_vessel": "to_vessel_out", + "from_vessel": "from_vessel_out", + "filtrate_vessel": "filtrate_out", + "reagent": "reagent", + "solvent": "solvent", + "compound": "compound", + } + # 处理协议步骤 for step in protocol_steps: node_id = str(uuid.uuid4()) @@ -310,38 +360,19 @@ def build_protocol_graph( # 物料流 params = step.get("param", {}) - input_resources_possible_names = [ - "vessel", - "to_vessel", - "from_vessel", - "reagent", - "solvent", - "compound", - "sources", - "targets", - ] - for target_port in input_resources_possible_names: - resource_name = params.get(target_port) + # 处理输入连接 + for param_key, target_port in INPUT_PORT_MAPPING.items(): + resource_name = params.get(param_key) if resource_name and resource_name in resource_last_writer: source_node, source_port = resource_last_writer[resource_name].split(":") G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) - output_resources = { - "vessel_out": params.get("vessel"), - "from_vessel_out": params.get("from_vessel"), - "to_vessel_out": params.get("to_vessel"), - "filtrate_out": params.get("filtrate_vessel"), - "reagent": params.get("reagent"), - "solvent": params.get("solvent"), - "compound": params.get("compound"), - "sources_out": params.get("sources"), - "targets_out": params.get("targets"), - } - - for source_port, resource_name in output_resources.items(): + # 处理输出:更新 resource_last_writer + for param_key, output_port in OUTPUT_PORT_MAPPING.items(): + resource_name = params.get(param_key) if resource_name: - resource_last_writer[resource_name] = f"{node_id}:{source_port}" + resource_last_writer[resource_name] = f"{node_id}:{output_port}" return G diff --git a/unilabos/workflow/convert_from_json.py b/unilabos/workflow/convert_from_json.py index 7a6d2b4..ff749d7 100644 --- a/unilabos/workflow/convert_from_json.py +++ b/unilabos/workflow/convert_from_json.py @@ -1,21 +1,68 @@ """ JSON 工作流转换模块 -提供从多种 JSON 格式转换为统一工作流格式的功能。 -支持的格式: -1. workflow/reagent 格式 -2. steps_info/labware_info 格式 +将 workflow/reagent 格式的 JSON 转换为统一工作流格式。 + +输入格式: +{ + "workflow": [ + {"action": "...", "action_args": {...}}, + ... + ], + "reagent": { + "reagent_name": {"slot": int, "well": [...], "labware": "..."}, + ... + } +} """ import json from os import PathLike from pathlib import Path -from typing import Any, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union from unilabos.workflow.common import WorkflowGraph, build_protocol_graph from unilabos.registry.registry import lab_registry +# ==================== 字段映射配置 ==================== + +# action 到 resource_name 的映射 +ACTION_RESOURCE_MAPPING: Dict[str, str] = { + # 生物实验操作 + "transfer_liquid": "liquid_handler.prcxi", + "transfer": "liquid_handler.prcxi", + "incubation": "incubator.prcxi", + "move_labware": "labware_mover.prcxi", + "oscillation": "shaker.prcxi", + # 有机化学操作 + "HeatChillToTemp": "heatchill.chemputer", + "StopHeatChill": "heatchill.chemputer", + "StartHeatChill": "heatchill.chemputer", + "HeatChill": "heatchill.chemputer", + "Dissolve": "stirrer.chemputer", + "Transfer": "liquid_handler.chemputer", + "Evaporate": "rotavap.chemputer", + "Recrystallize": "reactor.chemputer", + "Filter": "filter.chemputer", + "Dry": "dryer.chemputer", + "Add": "liquid_handler.chemputer", +} + +# action_args 字段到 parameters 字段的映射 +# 格式: {"old_key": "new_key"}, 仅映射需要重命名的字段 +ARGS_FIELD_MAPPING: Dict[str, str] = { + # 如果需要字段重命名,在这里配置 + # "old_field_name": "new_field_name", +} + +# 默认工作站名称 +DEFAULT_WORKSTATION = "PRCXI" + + +# ==================== 核心转换函数 ==================== + + def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]: """ 从 registry 获取指定设备和动作的 handles 配置 @@ -39,12 +86,10 @@ def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List handles = action_config.get("handles", {}) if isinstance(handles, dict): - # 处理 input handles (作为 target) for handle in handles.get("input", []): handler_key = handle.get("handler_key", "") if handler_key: result["source"].append(handler_key) - # 处理 output handles (作为 source) for handle in handles.get("output", []): handler_key = handle.get("handler_key", "") if handler_key: @@ -69,12 +114,9 @@ def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]: for edge in graph.edges: left_uuid = edge.get("source") right_uuid = edge.get("target") - # target_handle_key是target, right的输入节点(入节点) - # source_handle_key是source, left的输出节点(出节点) right_source_conn_key = edge.get("target_handle_key", "") left_target_conn_key = edge.get("source_handle_key", "") - # 获取源节点和目标节点信息 left_node = nodes.get(left_uuid, {}) right_node = nodes.get(right_uuid, {}) @@ -83,164 +125,93 @@ def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]: right_res_name = right_node.get("resource_name", "") right_template_name = right_node.get("template_name", "") - # 获取源节点的 output handles left_node_handles = get_action_handles(left_res_name, left_template_name) target_valid_keys = left_node_handles.get("target", []) target_valid_keys.append("ready") - # 获取目标节点的 input handles right_node_handles = get_action_handles(right_res_name, right_template_name) source_valid_keys = right_node_handles.get("source", []) source_valid_keys.append("ready") - # 如果节点配置了 output handles,则 source_port 必须有效 + # 验证目标节点(right)的输入端口 if not right_source_conn_key: - node_name = left_node.get("name", left_uuid[:8]) - errors.append(f"源节点 '{node_name}' 的 source_handle_key 为空," f"应设置为: {source_valid_keys}") + node_name = right_node.get("name", right_uuid[:8]) + errors.append(f"目标节点 '{node_name}' 的输入端口 (target_handle_key) 为空,应设置为: {source_valid_keys}") elif right_source_conn_key not in source_valid_keys: - node_name = left_node.get("name", left_uuid[:8]) + node_name = right_node.get("name", right_uuid[:8]) errors.append( - f"源节点 '{node_name}' 的 source 端点 '{right_source_conn_key}' 不存在," f"支持的端点: {source_valid_keys}" + f"目标节点 '{node_name}' 的输入端口 '{right_source_conn_key}' 不存在,支持的输入端口: {source_valid_keys}" ) - # 如果节点配置了 input handles,则 target_port 必须有效 + # 验证源节点(left)的输出端口 if not left_target_conn_key: - node_name = right_node.get("name", right_uuid[:8]) - errors.append(f"目标节点 '{node_name}' 的 target_handle_key 为空," f"应设置为: {target_valid_keys}") + node_name = left_node.get("name", left_uuid[:8]) + errors.append(f"源节点 '{node_name}' 的输出端口 (source_handle_key) 为空,应设置为: {target_valid_keys}") elif left_target_conn_key not in target_valid_keys: - node_name = right_node.get("name", right_uuid[:8]) + node_name = left_node.get("name", left_uuid[:8]) errors.append( - f"目标节点 '{node_name}' 的 target 端点 '{left_target_conn_key}' 不存在," - f"支持的端点: {target_valid_keys}" + f"源节点 '{node_name}' 的输出端口 '{left_target_conn_key}' 不存在,支持的输出端口: {target_valid_keys}" ) return len(errors) == 0, errors -# action 到 resource_name 的映射 -ACTION_RESOURCE_MAPPING: Dict[str, str] = { - # 生物实验操作 - "transfer_liquid": "liquid_handler.prcxi", - "transfer": "liquid_handler.prcxi", - "incubation": "incubator.prcxi", - "move_labware": "labware_mover.prcxi", - "oscillation": "shaker.prcxi", - # 有机化学操作 - "HeatChillToTemp": "heatchill.chemputer", - "StopHeatChill": "heatchill.chemputer", - "StartHeatChill": "heatchill.chemputer", - "HeatChill": "heatchill.chemputer", - "Dissolve": "stirrer.chemputer", - "Transfer": "liquid_handler.chemputer", - "Evaporate": "rotavap.chemputer", - "Recrystallize": "reactor.chemputer", - "Filter": "filter.chemputer", - "Dry": "dryer.chemputer", - "Add": "liquid_handler.chemputer", -} - - -def normalize_steps(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: +def normalize_workflow_steps(workflow: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ - 将不同格式的步骤数据规范化为统一格式 + 将 workflow 格式的步骤数据规范化 - 支持的输入格式: - - action + parameters - - action + action_args - - operation + parameters + 输入格式: + [{"action": "...", "action_args": {...}}, ...] + + 输出格式: + [{"action": "...", "parameters": {...}, "step_number": int}, ...] Args: - data: 原始步骤数据列表 + workflow: workflow 数组 Returns: - 规范化后的步骤列表,格式为 [{"action": str, "parameters": dict, "description": str?, "step_number": int?}, ...] + 规范化后的步骤列表 """ normalized = [] - for idx, step in enumerate(data): - # 获取动作名称(支持 action 或 operation 字段) - action = step.get("action") or step.get("operation") + for idx, step in enumerate(workflow): + action = step.get("action") if not action: continue - # 获取参数(支持 parameters 或 action_args 字段) - raw_params = step.get("parameters") or step.get("action_args") or {} - params = dict(raw_params) + # 获取参数: action_args + raw_params = step.get("action_args", {}) + params = {} - # 规范化 source/target -> sources/targets - if "source" in raw_params and "sources" not in raw_params: - params["sources"] = raw_params["source"] - if "target" in raw_params and "targets" not in raw_params: - params["targets"] = raw_params["target"] + # 应用字段映射 + for key, value in raw_params.items(): + mapped_key = ARGS_FIELD_MAPPING.get(key, key) + params[mapped_key] = value - # 获取描述(支持 description 或 purpose 字段) - description = step.get("description") or step.get("purpose") + step_dict = { + "action": action, + "parameters": params, + "step_number": idx + 1, + } - # 获取步骤编号(优先使用原始数据中的 step_number,否则使用索引+1) - step_number = step.get("step_number", idx + 1) - - step_dict = {"action": action, "parameters": params, "step_number": step_number} - if description: - step_dict["description"] = description + # 保留描述字段 + if "description" in step: + step_dict["description"] = step["description"] normalized.append(step_dict) return normalized -def normalize_labware(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: - """ - 将不同格式的 labware 数据规范化为统一的字典格式 - - 支持的输入格式: - - reagent_name + material_name + positions - - name + labware + slot - - Args: - data: 原始 labware 数据列表 - - Returns: - 规范化后的 labware 字典,格式为 {name: {"slot": int, "labware": str, "well": list, "type": str, "role": str, "name": str}, ...} - """ - labware = {} - for item in data: - # 获取 key 名称(优先使用 reagent_name,其次是 material_name 或 name) - reagent_name = item.get("reagent_name") - key = reagent_name or item.get("material_name") or item.get("name") - if not key: - continue - - key = str(key) - - # 处理重复 key,自动添加后缀 - idx = 1 - original_key = key - while key in labware: - idx += 1 - key = f"{original_key}_{idx}" - - labware[key] = { - "slot": item.get("positions") or item.get("slot"), - "labware": item.get("material_name") or item.get("labware"), - "well": item.get("well", []), - "type": item.get("type", "reagent"), - "role": item.get("role", ""), - "name": key, - } - - return labware - - def convert_from_json( data: Union[str, PathLike, Dict[str, Any]], - workstation_name: str = "PRCXi", + workstation_name: str = DEFAULT_WORKSTATION, validate: bool = True, ) -> WorkflowGraph: """ 从 JSON 数据或文件转换为 WorkflowGraph - 支持的 JSON 格式: - 1. {"workflow": [...], "reagent": {...}} - 直接格式 - 2. {"steps_info": [...], "labware_info": [...]} - 需要规范化的格式 + JSON 格式: + {"workflow": [...], "reagent": {...}} Args: data: JSON 文件路径、字典数据、或 JSON 字符串 @@ -251,7 +222,7 @@ def convert_from_json( WorkflowGraph: 构建好的工作流图 Raises: - ValueError: 不支持的 JSON 格式 或 句柄校验失败 + ValueError: 不支持的 JSON 格式 FileNotFoundError: 文件不存在 json.JSONDecodeError: JSON 解析失败 """ @@ -262,7 +233,6 @@ def convert_from_json( with path.open("r", encoding="utf-8") as fp: json_data = json.load(fp) elif isinstance(data, str): - # 尝试作为 JSON 字符串解析 json_data = json.loads(data) else: raise FileNotFoundError(f"文件不存在: {data}") @@ -271,30 +241,24 @@ def convert_from_json( else: raise TypeError(f"不支持的数据类型: {type(data)}") - # 根据格式解析数据 - if "workflow" in json_data and "reagent" in json_data: - # 格式1: workflow/reagent(已经是规范格式) - protocol_steps = json_data["workflow"] - labware_info = json_data["reagent"] - elif "steps_info" in json_data and "labware_info" in json_data: - # 格式2: steps_info/labware_info(需要规范化) - protocol_steps = normalize_steps(json_data["steps_info"]) - labware_info = normalize_labware(json_data["labware_info"]) - elif "steps" in json_data and "labware" in json_data: - # 格式3: steps/labware(另一种常见格式) - protocol_steps = normalize_steps(json_data["steps"]) - if isinstance(json_data["labware"], list): - labware_info = normalize_labware(json_data["labware"]) - else: - labware_info = json_data["labware"] - else: + # 校验格式 + if "workflow" not in json_data or "reagent" not in json_data: raise ValueError( - "不支持的 JSON 格式。支持的格式:\n" - "1. {'workflow': [...], 'reagent': {...}}\n" - "2. {'steps_info': [...], 'labware_info': [...]}\n" - "3. {'steps': [...], 'labware': [...]}" + "不支持的 JSON 格式。请使用标准格式:\n" + '{"workflow": [{"action": "...", "action_args": {...}}, ...], ' + '"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}' ) + # 提取数据 + workflow = json_data["workflow"] + reagent = json_data["reagent"] + + # 规范化步骤数据 + protocol_steps = normalize_workflow_steps(workflow) + + # reagent 已经是字典格式,直接使用 + labware_info = reagent + # 构建工作流图 graph = build_protocol_graph( labware_info=labware_info, @@ -317,7 +281,7 @@ def convert_from_json( def convert_json_to_node_link( data: Union[str, PathLike, Dict[str, Any]], - workstation_name: str = "PRCXi", + workstation_name: str = DEFAULT_WORKSTATION, ) -> Dict[str, Any]: """ 将 JSON 数据转换为 node-link 格式的字典 @@ -335,7 +299,7 @@ def convert_json_to_node_link( def convert_json_to_workflow_list( data: Union[str, PathLike, Dict[str, Any]], - workstation_name: str = "PRCXi", + workstation_name: str = DEFAULT_WORKSTATION, ) -> List[Dict[str, Any]]: """ 将 JSON 数据转换为工作流列表格式 @@ -349,8 +313,3 @@ def convert_json_to_workflow_list( """ graph = convert_from_json(data, workstation_name) return graph.to_dict() - - -# 为了向后兼容,保留下划线前缀的别名 -_normalize_steps = normalize_steps -_normalize_labware = normalize_labware diff --git a/unilabos/workflow/legacy/convert_from_json_legacy.py b/unilabos/workflow/legacy/convert_from_json_legacy.py new file mode 100644 index 0000000..7a6d2b4 --- /dev/null +++ b/unilabos/workflow/legacy/convert_from_json_legacy.py @@ -0,0 +1,356 @@ +""" +JSON 工作流转换模块 + +提供从多种 JSON 格式转换为统一工作流格式的功能。 +支持的格式: +1. workflow/reagent 格式 +2. steps_info/labware_info 格式 +""" + +import json +from os import PathLike +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple, Union + +from unilabos.workflow.common import WorkflowGraph, build_protocol_graph +from unilabos.registry.registry import lab_registry + + +def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]: + """ + 从 registry 获取指定设备和动作的 handles 配置 + + Args: + resource_name: 设备资源名称,如 "liquid_handler.prcxi" + template_name: 动作模板名称,如 "transfer_liquid" + + Returns: + 包含 source 和 target handler_keys 的字典: + {"source": ["sources_out", "targets_out", ...], "target": ["sources", "targets", ...]} + """ + result = {"source": [], "target": []} + + device_info = lab_registry.device_type_registry.get(resource_name, {}) + if not device_info: + return result + + action_mappings = device_info.get("class", {}).get("action_value_mappings", {}) + action_config = action_mappings.get(template_name, {}) + handles = action_config.get("handles", {}) + + if isinstance(handles, dict): + # 处理 input handles (作为 target) + for handle in handles.get("input", []): + handler_key = handle.get("handler_key", "") + if handler_key: + result["source"].append(handler_key) + # 处理 output handles (作为 source) + for handle in handles.get("output", []): + handler_key = handle.get("handler_key", "") + if handler_key: + result["target"].append(handler_key) + + return result + + +def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]: + """ + 校验工作流图中所有边的句柄配置是否正确 + + Args: + graph: 工作流图对象 + + Returns: + (is_valid, errors): 是否有效,错误信息列表 + """ + errors = [] + nodes = graph.nodes + + for edge in graph.edges: + left_uuid = edge.get("source") + right_uuid = edge.get("target") + # target_handle_key是target, right的输入节点(入节点) + # source_handle_key是source, left的输出节点(出节点) + right_source_conn_key = edge.get("target_handle_key", "") + left_target_conn_key = edge.get("source_handle_key", "") + + # 获取源节点和目标节点信息 + left_node = nodes.get(left_uuid, {}) + right_node = nodes.get(right_uuid, {}) + + left_res_name = left_node.get("resource_name", "") + left_template_name = left_node.get("template_name", "") + right_res_name = right_node.get("resource_name", "") + right_template_name = right_node.get("template_name", "") + + # 获取源节点的 output handles + left_node_handles = get_action_handles(left_res_name, left_template_name) + target_valid_keys = left_node_handles.get("target", []) + target_valid_keys.append("ready") + + # 获取目标节点的 input handles + right_node_handles = get_action_handles(right_res_name, right_template_name) + source_valid_keys = right_node_handles.get("source", []) + source_valid_keys.append("ready") + + # 如果节点配置了 output handles,则 source_port 必须有效 + if not right_source_conn_key: + node_name = left_node.get("name", left_uuid[:8]) + errors.append(f"源节点 '{node_name}' 的 source_handle_key 为空," f"应设置为: {source_valid_keys}") + elif right_source_conn_key not in source_valid_keys: + node_name = left_node.get("name", left_uuid[:8]) + errors.append( + f"源节点 '{node_name}' 的 source 端点 '{right_source_conn_key}' 不存在," f"支持的端点: {source_valid_keys}" + ) + + # 如果节点配置了 input handles,则 target_port 必须有效 + if not left_target_conn_key: + node_name = right_node.get("name", right_uuid[:8]) + errors.append(f"目标节点 '{node_name}' 的 target_handle_key 为空," f"应设置为: {target_valid_keys}") + elif left_target_conn_key not in target_valid_keys: + node_name = right_node.get("name", right_uuid[:8]) + errors.append( + f"目标节点 '{node_name}' 的 target 端点 '{left_target_conn_key}' 不存在," + f"支持的端点: {target_valid_keys}" + ) + + return len(errors) == 0, errors + + +# action 到 resource_name 的映射 +ACTION_RESOURCE_MAPPING: Dict[str, str] = { + # 生物实验操作 + "transfer_liquid": "liquid_handler.prcxi", + "transfer": "liquid_handler.prcxi", + "incubation": "incubator.prcxi", + "move_labware": "labware_mover.prcxi", + "oscillation": "shaker.prcxi", + # 有机化学操作 + "HeatChillToTemp": "heatchill.chemputer", + "StopHeatChill": "heatchill.chemputer", + "StartHeatChill": "heatchill.chemputer", + "HeatChill": "heatchill.chemputer", + "Dissolve": "stirrer.chemputer", + "Transfer": "liquid_handler.chemputer", + "Evaporate": "rotavap.chemputer", + "Recrystallize": "reactor.chemputer", + "Filter": "filter.chemputer", + "Dry": "dryer.chemputer", + "Add": "liquid_handler.chemputer", +} + + +def normalize_steps(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + 将不同格式的步骤数据规范化为统一格式 + + 支持的输入格式: + - action + parameters + - action + action_args + - operation + parameters + + Args: + data: 原始步骤数据列表 + + Returns: + 规范化后的步骤列表,格式为 [{"action": str, "parameters": dict, "description": str?, "step_number": int?}, ...] + """ + normalized = [] + for idx, step in enumerate(data): + # 获取动作名称(支持 action 或 operation 字段) + action = step.get("action") or step.get("operation") + if not action: + continue + + # 获取参数(支持 parameters 或 action_args 字段) + raw_params = step.get("parameters") or step.get("action_args") or {} + params = dict(raw_params) + + # 规范化 source/target -> sources/targets + if "source" in raw_params and "sources" not in raw_params: + params["sources"] = raw_params["source"] + if "target" in raw_params and "targets" not in raw_params: + params["targets"] = raw_params["target"] + + # 获取描述(支持 description 或 purpose 字段) + description = step.get("description") or step.get("purpose") + + # 获取步骤编号(优先使用原始数据中的 step_number,否则使用索引+1) + step_number = step.get("step_number", idx + 1) + + step_dict = {"action": action, "parameters": params, "step_number": step_number} + if description: + step_dict["description"] = description + + normalized.append(step_dict) + + return normalized + + +def normalize_labware(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: + """ + 将不同格式的 labware 数据规范化为统一的字典格式 + + 支持的输入格式: + - reagent_name + material_name + positions + - name + labware + slot + + Args: + data: 原始 labware 数据列表 + + Returns: + 规范化后的 labware 字典,格式为 {name: {"slot": int, "labware": str, "well": list, "type": str, "role": str, "name": str}, ...} + """ + labware = {} + for item in data: + # 获取 key 名称(优先使用 reagent_name,其次是 material_name 或 name) + reagent_name = item.get("reagent_name") + key = reagent_name or item.get("material_name") or item.get("name") + if not key: + continue + + key = str(key) + + # 处理重复 key,自动添加后缀 + idx = 1 + original_key = key + while key in labware: + idx += 1 + key = f"{original_key}_{idx}" + + labware[key] = { + "slot": item.get("positions") or item.get("slot"), + "labware": item.get("material_name") or item.get("labware"), + "well": item.get("well", []), + "type": item.get("type", "reagent"), + "role": item.get("role", ""), + "name": key, + } + + return labware + + +def convert_from_json( + data: Union[str, PathLike, Dict[str, Any]], + workstation_name: str = "PRCXi", + validate: bool = True, +) -> WorkflowGraph: + """ + 从 JSON 数据或文件转换为 WorkflowGraph + + 支持的 JSON 格式: + 1. {"workflow": [...], "reagent": {...}} - 直接格式 + 2. {"steps_info": [...], "labware_info": [...]} - 需要规范化的格式 + + Args: + data: JSON 文件路径、字典数据、或 JSON 字符串 + workstation_name: 工作站名称,默认 "PRCXi" + validate: 是否校验句柄配置,默认 True + + Returns: + WorkflowGraph: 构建好的工作流图 + + Raises: + ValueError: 不支持的 JSON 格式 或 句柄校验失败 + FileNotFoundError: 文件不存在 + json.JSONDecodeError: JSON 解析失败 + """ + # 处理输入数据 + if isinstance(data, (str, PathLike)): + path = Path(data) + if path.exists(): + with path.open("r", encoding="utf-8") as fp: + json_data = json.load(fp) + elif isinstance(data, str): + # 尝试作为 JSON 字符串解析 + json_data = json.loads(data) + else: + raise FileNotFoundError(f"文件不存在: {data}") + elif isinstance(data, dict): + json_data = data + else: + raise TypeError(f"不支持的数据类型: {type(data)}") + + # 根据格式解析数据 + if "workflow" in json_data and "reagent" in json_data: + # 格式1: workflow/reagent(已经是规范格式) + protocol_steps = json_data["workflow"] + labware_info = json_data["reagent"] + elif "steps_info" in json_data and "labware_info" in json_data: + # 格式2: steps_info/labware_info(需要规范化) + protocol_steps = normalize_steps(json_data["steps_info"]) + labware_info = normalize_labware(json_data["labware_info"]) + elif "steps" in json_data and "labware" in json_data: + # 格式3: steps/labware(另一种常见格式) + protocol_steps = normalize_steps(json_data["steps"]) + if isinstance(json_data["labware"], list): + labware_info = normalize_labware(json_data["labware"]) + else: + labware_info = json_data["labware"] + else: + raise ValueError( + "不支持的 JSON 格式。支持的格式:\n" + "1. {'workflow': [...], 'reagent': {...}}\n" + "2. {'steps_info': [...], 'labware_info': [...]}\n" + "3. {'steps': [...], 'labware': [...]}" + ) + + # 构建工作流图 + graph = build_protocol_graph( + labware_info=labware_info, + protocol_steps=protocol_steps, + workstation_name=workstation_name, + action_resource_mapping=ACTION_RESOURCE_MAPPING, + ) + + # 校验句柄配置 + if validate: + is_valid, errors = validate_workflow_handles(graph) + if not is_valid: + import warnings + + for error in errors: + warnings.warn(f"句柄校验警告: {error}") + + return graph + + +def convert_json_to_node_link( + data: Union[str, PathLike, Dict[str, Any]], + workstation_name: str = "PRCXi", +) -> Dict[str, Any]: + """ + 将 JSON 数据转换为 node-link 格式的字典 + + Args: + data: JSON 文件路径、字典数据、或 JSON 字符串 + workstation_name: 工作站名称,默认 "PRCXi" + + Returns: + Dict: node-link 格式的工作流数据 + """ + graph = convert_from_json(data, workstation_name) + return graph.to_node_link_dict() + + +def convert_json_to_workflow_list( + data: Union[str, PathLike, Dict[str, Any]], + workstation_name: str = "PRCXi", +) -> List[Dict[str, Any]]: + """ + 将 JSON 数据转换为工作流列表格式 + + Args: + data: JSON 文件路径、字典数据、或 JSON 字符串 + workstation_name: 工作站名称,默认 "PRCXi" + + Returns: + List: 工作流节点列表 + """ + graph = convert_from_json(data, workstation_name) + return graph.to_dict() + + +# 为了向后兼容,保留下划线前缀的别名 +_normalize_steps = normalize_steps +_normalize_labware = normalize_labware