fix upload workflow json

This commit is contained in:
Xuwznln
2026-02-02 17:18:45 +08:00
parent 07c9e6f0fe
commit b0da149252
5 changed files with 748 additions and 189 deletions

213
tests/workflow/test.json Normal file
View File

@@ -0,0 +1,213 @@
{
"workflow": [
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines",
"targets": "Liquid_1",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines",
"targets": "Liquid_2",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines",
"targets": "Liquid_3",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines_2",
"targets": "Liquid_4",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines_2",
"targets": "Liquid_5",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines_2",
"targets": "Liquid_6",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines_3",
"targets": "dest_set",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines_3",
"targets": "dest_set_2",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
},
{
"action": "transfer_liquid",
"action_args": {
"sources": "cell_lines_3",
"targets": "dest_set_3",
"asp_vol": 100.0,
"dis_vol": 74.75,
"asp_flow_rate": 94.0,
"dis_flow_rate": 95.5
}
}
],
"reagent": {
"Liquid_1": {
"slot": 1,
"well": [
"A4",
"A7",
"A10"
],
"labware": "rep 1"
},
"Liquid_4": {
"slot": 1,
"well": [
"A4",
"A7",
"A10"
],
"labware": "rep 1"
},
"dest_set": {
"slot": 1,
"well": [
"A4",
"A7",
"A10"
],
"labware": "rep 1"
},
"Liquid_2": {
"slot": 2,
"well": [
"A3",
"A5",
"A8"
],
"labware": "rep 2"
},
"Liquid_5": {
"slot": 2,
"well": [
"A3",
"A5",
"A8"
],
"labware": "rep 2"
},
"dest_set_2": {
"slot": 2,
"well": [
"A3",
"A5",
"A8"
],
"labware": "rep 2"
},
"Liquid_3": {
"slot": 3,
"well": [
"A4",
"A6",
"A10"
],
"labware": "rep 3"
},
"Liquid_6": {
"slot": 3,
"well": [
"A4",
"A6",
"A10"
],
"labware": "rep 3"
},
"dest_set_3": {
"slot": 3,
"well": [
"A4",
"A6",
"A10"
],
"labware": "rep 3"
},
"cell_lines": {
"slot": 4,
"well": [
"A1",
"A3",
"A5"
],
"labware": "DRUG + YOYO-MEDIA"
},
"cell_lines_2": {
"slot": 4,
"well": [
"A1",
"A3",
"A5"
],
"labware": "DRUG + YOYO-MEDIA"
},
"cell_lines_3": {
"slot": 4,
"well": [
"A1",
"A3",
"A5"
],
"labware": "DRUG + YOYO-MEDIA"
}
}
}

View File

@@ -361,7 +361,7 @@ class HTTPClient:
""" """
# target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取 # target_lab_uuid 暂时使用默认值,后续由后端根据 ak/sk 获取
payload = { payload = {
"target_lab_uuid": "28c38bb0-63f6-4352-b0d8-b5b8eb1766d5", "target_lab_uuid": "cf44e98c-7f3e-4175-b526-1fa338b43f65",
"name": name, "name": name,
"data": { "data": {
"workflow_uuid": workflow_uuid, "workflow_uuid": workflow_uuid,

View File

@@ -8,6 +8,20 @@ from typing import Dict, List, Any, Tuple, Optional
Json = Dict[str, Any] Json = Dict[str, Any]
# ==================== 默认配置 ====================
# create_resource 节点默认参数
CREATE_RESOURCE_DEFAULTS = {
"device_id": "/PRCXI",
"parent": "/PRCXI/PRCXI_Deck",
"class_name": "PRCXI_BioER_96_wellplate",
}
# 默认液体体积 (uL)
DEFAULT_LIQUID_VOLUME = 1e5
# ---------------- Graph ---------------- # ---------------- Graph ----------------
@@ -228,7 +242,7 @@ def refactor_data(
def build_protocol_graph( def build_protocol_graph(
labware_info: List[Dict[str, Any]], labware_info: Dict[str, Dict[str, Any]],
protocol_steps: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]],
workstation_name: str, workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None, action_resource_mapping: Optional[Dict[str, str]] = None,
@@ -236,7 +250,7 @@ def build_protocol_graph(
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑 """统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args: Args:
labware_info: labware 信息字典 labware_info: labware 信息字典,格式为 {name: {slot, well, labware, ...}, ...}
protocol_steps: 协议步骤列表 protocol_steps: 协议步骤列表
workstation_name: 工作站名称 workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选 action_resource_mapping: action 到 resource_name 的映射字典,可选
@@ -251,13 +265,21 @@ def build_protocol_graph(
# 为所有labware创建资源节点 # 为所有labware创建资源节点
res_index = 0 res_index = 0
for labware_id, item in labware_info.items(): for labware_id, item in labware_info.items():
# item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}")
node_id = str(uuid.uuid4()) node_id = str(uuid.uuid4())
# res_id 不能有空格,替换为下划线
res_id = str(labware_id).replace(" ", "_")
# 从 reagent 数据中获取 well 信息
wells = item.get("well", [])
slot = str(item.get("slot", "")) # slot_on_deck 是字符串
well_count = len(wells) if wells else 1
# 判断节点类型 # 判断节点类型
if "Rack" in str(labware_id) or "Tip" in str(labware_id): if "Rack" in str(labware_id) or "Tip" in str(labware_id):
lab_node_type = "Labware" lab_node_type = "Labware"
description = f"Prepare Labware: {labware_id}" description = f"Prepare Labware: {labware_id}"
liquid_input_slot = wells if wells else [-1]
liquid_type = [] liquid_type = []
liquid_volume = [] liquid_volume = []
elif item.get("type") == "hardware" or "reactor" in str(labware_id).lower(): elif item.get("type") == "hardware" or "reactor" in str(labware_id).lower():
@@ -265,13 +287,16 @@ def build_protocol_graph(
continue continue
lab_node_type = "Sample" lab_node_type = "Sample"
description = f"Prepare Reactor: {labware_id}" description = f"Prepare Reactor: {labware_id}"
liquid_input_slot = wells if wells else [-1]
liquid_type = [] liquid_type = []
liquid_volume = [] liquid_volume = []
else: else:
lab_node_type = "Reagent" lab_node_type = "Reagent"
description = f"Add Reagent to Flask: {labware_id}" description = f"Add Reagent to Flask: {labware_id}"
liquid_type = [labware_id] # liquid_input_slot, liquid_type, liquid_volume 数量与 wells 保持一致
liquid_volume = [1e5] liquid_input_slot = wells if wells else [-1]
liquid_type = [res_id] * well_count
liquid_volume = [DEFAULT_LIQUID_VOLUME] * well_count
res_index += 1 res_index += 1
G.add_node( G.add_node(
@@ -283,21 +308,46 @@ def build_protocol_graph(
lab_node_type=lab_node_type, lab_node_type=lab_node_type,
footer="create_resource-host_node", footer="create_resource-host_node",
param={ param={
"res_id": labware_id, "res_id": res_id,
"device_id": WORKSTATION_ID, "device_id": CREATE_RESOURCE_DEFAULTS["device_id"],
"class_name": "container", "class_name": CREATE_RESOURCE_DEFAULTS["class_name"],
"parent": WORKSTATION_ID, "parent": CREATE_RESOURCE_DEFAULTS["parent"],
"bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0}, "bind_locations": {"x": 0.0, "y": 0.0, "z": 0.0},
"liquid_input_slot": [-1], "liquid_input_slot": liquid_input_slot,
"liquid_type": liquid_type, "liquid_type": liquid_type,
"liquid_volume": liquid_volume, "liquid_volume": liquid_volume,
"slot_on_deck": "", "slot_on_deck": slot,
}, },
) )
resource_last_writer[labware_id] = f"{node_id}:labware" # create_resource 节点输出 liquid_slots用于连接 transfer_liquid 的 sources/targets
resource_last_writer[labware_id] = f"{node_id}:liquid_slots"
last_control_node_id = None last_control_node_id = None
# 端口名称映射JSON 字段名 -> 实际 handle key
INPUT_PORT_MAPPING = {
"sources": "sources_identifier",
"targets": "targets_identifier",
"vessel": "vessel",
"to_vessel": "to_vessel",
"from_vessel": "from_vessel",
"reagent": "reagent",
"solvent": "solvent",
"compound": "compound",
}
OUTPUT_PORT_MAPPING = {
"sources": "sources_out", # 输出端口是 xxx_out
"targets": "targets_out", # 输出端口是 xxx_out
"vessel": "vessel_out",
"to_vessel": "to_vessel_out",
"from_vessel": "from_vessel_out",
"filtrate_vessel": "filtrate_out",
"reagent": "reagent",
"solvent": "solvent",
"compound": "compound",
}
# 处理协议步骤 # 处理协议步骤
for step in protocol_steps: for step in protocol_steps:
node_id = str(uuid.uuid4()) node_id = str(uuid.uuid4())
@@ -310,38 +360,19 @@ def build_protocol_graph(
# 物料流 # 物料流
params = step.get("param", {}) params = step.get("param", {})
input_resources_possible_names = [
"vessel",
"to_vessel",
"from_vessel",
"reagent",
"solvent",
"compound",
"sources",
"targets",
]
for target_port in input_resources_possible_names: # 处理输入连接
resource_name = params.get(target_port) for param_key, target_port in INPUT_PORT_MAPPING.items():
resource_name = params.get(param_key)
if resource_name and resource_name in resource_last_writer: if resource_name and resource_name in resource_last_writer:
source_node, source_port = resource_last_writer[resource_name].split(":") source_node, source_port = resource_last_writer[resource_name].split(":")
G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port) G.add_edge(source_node, node_id, source_port=source_port, target_port=target_port)
output_resources = { # 处理输出:更新 resource_last_writer
"vessel_out": params.get("vessel"), for param_key, output_port in OUTPUT_PORT_MAPPING.items():
"from_vessel_out": params.get("from_vessel"), resource_name = params.get(param_key)
"to_vessel_out": params.get("to_vessel"),
"filtrate_out": params.get("filtrate_vessel"),
"reagent": params.get("reagent"),
"solvent": params.get("solvent"),
"compound": params.get("compound"),
"sources_out": params.get("sources"),
"targets_out": params.get("targets"),
}
for source_port, resource_name in output_resources.items():
if resource_name: if resource_name:
resource_last_writer[resource_name] = f"{node_id}:{source_port}" resource_last_writer[resource_name] = f"{node_id}:{output_port}"
return G return G

View File

@@ -1,21 +1,68 @@
""" """
JSON 工作流转换模块 JSON 工作流转换模块
提供从多种 JSON 格式转换为统一工作流格式的功能 将 workflow/reagent 格式的 JSON 转换为统一工作流格式。
支持的格式:
1. workflow/reagent 格式 输入格式:
2. steps_info/labware_info 格式 {
"workflow": [
{"action": "...", "action_args": {...}},
...
],
"reagent": {
"reagent_name": {"slot": int, "well": [...], "labware": "..."},
...
}
}
""" """
import json import json
from os import PathLike from os import PathLike
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from unilabos.workflow.common import WorkflowGraph, build_protocol_graph from unilabos.workflow.common import WorkflowGraph, build_protocol_graph
from unilabos.registry.registry import lab_registry from unilabos.registry.registry import lab_registry
# ==================== 字段映射配置 ====================
# action 到 resource_name 的映射
ACTION_RESOURCE_MAPPING: Dict[str, str] = {
# 生物实验操作
"transfer_liquid": "liquid_handler.prcxi",
"transfer": "liquid_handler.prcxi",
"incubation": "incubator.prcxi",
"move_labware": "labware_mover.prcxi",
"oscillation": "shaker.prcxi",
# 有机化学操作
"HeatChillToTemp": "heatchill.chemputer",
"StopHeatChill": "heatchill.chemputer",
"StartHeatChill": "heatchill.chemputer",
"HeatChill": "heatchill.chemputer",
"Dissolve": "stirrer.chemputer",
"Transfer": "liquid_handler.chemputer",
"Evaporate": "rotavap.chemputer",
"Recrystallize": "reactor.chemputer",
"Filter": "filter.chemputer",
"Dry": "dryer.chemputer",
"Add": "liquid_handler.chemputer",
}
# action_args 字段到 parameters 字段的映射
# 格式: {"old_key": "new_key"}, 仅映射需要重命名的字段
ARGS_FIELD_MAPPING: Dict[str, str] = {
# 如果需要字段重命名,在这里配置
# "old_field_name": "new_field_name",
}
# 默认工作站名称
DEFAULT_WORKSTATION = "PRCXI"
# ==================== 核心转换函数 ====================
def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]: def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]:
""" """
从 registry 获取指定设备和动作的 handles 配置 从 registry 获取指定设备和动作的 handles 配置
@@ -39,12 +86,10 @@ def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List
handles = action_config.get("handles", {}) handles = action_config.get("handles", {})
if isinstance(handles, dict): if isinstance(handles, dict):
# 处理 input handles (作为 target)
for handle in handles.get("input", []): for handle in handles.get("input", []):
handler_key = handle.get("handler_key", "") handler_key = handle.get("handler_key", "")
if handler_key: if handler_key:
result["source"].append(handler_key) result["source"].append(handler_key)
# 处理 output handles (作为 source)
for handle in handles.get("output", []): for handle in handles.get("output", []):
handler_key = handle.get("handler_key", "") handler_key = handle.get("handler_key", "")
if handler_key: if handler_key:
@@ -69,12 +114,9 @@ def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]:
for edge in graph.edges: for edge in graph.edges:
left_uuid = edge.get("source") left_uuid = edge.get("source")
right_uuid = edge.get("target") right_uuid = edge.get("target")
# target_handle_key是target, right的输入节点入节点
# source_handle_key是source, left的输出节点出节点
right_source_conn_key = edge.get("target_handle_key", "") right_source_conn_key = edge.get("target_handle_key", "")
left_target_conn_key = edge.get("source_handle_key", "") left_target_conn_key = edge.get("source_handle_key", "")
# 获取源节点和目标节点信息
left_node = nodes.get(left_uuid, {}) left_node = nodes.get(left_uuid, {})
right_node = nodes.get(right_uuid, {}) right_node = nodes.get(right_uuid, {})
@@ -83,164 +125,93 @@ def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]:
right_res_name = right_node.get("resource_name", "") right_res_name = right_node.get("resource_name", "")
right_template_name = right_node.get("template_name", "") right_template_name = right_node.get("template_name", "")
# 获取源节点的 output handles
left_node_handles = get_action_handles(left_res_name, left_template_name) left_node_handles = get_action_handles(left_res_name, left_template_name)
target_valid_keys = left_node_handles.get("target", []) target_valid_keys = left_node_handles.get("target", [])
target_valid_keys.append("ready") target_valid_keys.append("ready")
# 获取目标节点的 input handles
right_node_handles = get_action_handles(right_res_name, right_template_name) right_node_handles = get_action_handles(right_res_name, right_template_name)
source_valid_keys = right_node_handles.get("source", []) source_valid_keys = right_node_handles.get("source", [])
source_valid_keys.append("ready") source_valid_keys.append("ready")
# 如果节点配置了 output handles则 source_port 必须有效 # 验证目标节点right的输入端口
if not right_source_conn_key: if not right_source_conn_key:
node_name = left_node.get("name", left_uuid[:8]) node_name = right_node.get("name", right_uuid[:8])
errors.append(f"节点 '{node_name}' source_handle_key 为空," f"应设置为: {source_valid_keys}") errors.append(f"目标节点 '{node_name}'输入端口 (target_handle_key) 为空,应设置为: {source_valid_keys}")
elif right_source_conn_key not in source_valid_keys: elif right_source_conn_key not in source_valid_keys:
node_name = left_node.get("name", left_uuid[:8]) node_name = right_node.get("name", right_uuid[:8])
errors.append( errors.append(
f"节点 '{node_name}' source 端点 '{right_source_conn_key}' 不存在," f"支持的端点: {source_valid_keys}" f"目标节点 '{node_name}'输入端口 '{right_source_conn_key}' 不存在,支持的输入端口: {source_valid_keys}"
) )
# 如果节点配置了 input handles则 target_port 必须有效 # 验证源节点left的输出端口
if not left_target_conn_key: if not left_target_conn_key:
node_name = right_node.get("name", right_uuid[:8]) node_name = left_node.get("name", left_uuid[:8])
errors.append(f"目标节点 '{node_name}' target_handle_key 为空," f"应设置为: {target_valid_keys}") errors.append(f"节点 '{node_name}'输出端口 (source_handle_key) 为空,应设置为: {target_valid_keys}")
elif left_target_conn_key not in target_valid_keys: elif left_target_conn_key not in target_valid_keys:
node_name = right_node.get("name", right_uuid[:8]) node_name = left_node.get("name", left_uuid[:8])
errors.append( errors.append(
f"目标节点 '{node_name}' target 端点 '{left_target_conn_key}' 不存在," f"节点 '{node_name}'输出端口 '{left_target_conn_key}' 不存在,支持的输出端口: {target_valid_keys}"
f"支持的端点: {target_valid_keys}"
) )
return len(errors) == 0, errors return len(errors) == 0, errors
# action 到 resource_name 的映射 def normalize_workflow_steps(workflow: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
ACTION_RESOURCE_MAPPING: Dict[str, str] = {
# 生物实验操作
"transfer_liquid": "liquid_handler.prcxi",
"transfer": "liquid_handler.prcxi",
"incubation": "incubator.prcxi",
"move_labware": "labware_mover.prcxi",
"oscillation": "shaker.prcxi",
# 有机化学操作
"HeatChillToTemp": "heatchill.chemputer",
"StopHeatChill": "heatchill.chemputer",
"StartHeatChill": "heatchill.chemputer",
"HeatChill": "heatchill.chemputer",
"Dissolve": "stirrer.chemputer",
"Transfer": "liquid_handler.chemputer",
"Evaporate": "rotavap.chemputer",
"Recrystallize": "reactor.chemputer",
"Filter": "filter.chemputer",
"Dry": "dryer.chemputer",
"Add": "liquid_handler.chemputer",
}
def normalize_steps(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
""" """
不同格式的步骤数据规范化为统一格式 workflow 格式的步骤数据规范化
支持的输入格式 输入格式:
- action + parameters [{"action": "...", "action_args": {...}}, ...]
- action + action_args
- operation + parameters 输出格式:
[{"action": "...", "parameters": {...}, "step_number": int}, ...]
Args: Args:
data: 原始步骤数据列表 workflow: workflow 数组
Returns: Returns:
规范化后的步骤列表,格式为 [{"action": str, "parameters": dict, "description": str?, "step_number": int?}, ...] 规范化后的步骤列表
""" """
normalized = [] normalized = []
for idx, step in enumerate(data): for idx, step in enumerate(workflow):
# 获取动作名称(支持 action 或 operation 字段) action = step.get("action")
action = step.get("action") or step.get("operation")
if not action: if not action:
continue continue
# 获取参数(支持 parameters 或 action_args 字段) # 获取参数: action_args
raw_params = step.get("parameters") or step.get("action_args") or {} raw_params = step.get("action_args", {})
params = dict(raw_params) params = {}
# 规范化 source/target -> sources/targets # 应用字段映射
if "source" in raw_params and "sources" not in raw_params: for key, value in raw_params.items():
params["sources"] = raw_params["source"] mapped_key = ARGS_FIELD_MAPPING.get(key, key)
if "target" in raw_params and "targets" not in raw_params: params[mapped_key] = value
params["targets"] = raw_params["target"]
# 获取描述(支持 description 或 purpose 字段) step_dict = {
description = step.get("description") or step.get("purpose") "action": action,
"parameters": params,
"step_number": idx + 1,
}
# 获取步骤编号(优先使用原始数据中的 step_number否则使用索引+1 # 保留描述字段
step_number = step.get("step_number", idx + 1) if "description" in step:
step_dict["description"] = step["description"]
step_dict = {"action": action, "parameters": params, "step_number": step_number}
if description:
step_dict["description"] = description
normalized.append(step_dict) normalized.append(step_dict)
return normalized return normalized
def normalize_labware(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
将不同格式的 labware 数据规范化为统一的字典格式
支持的输入格式:
- reagent_name + material_name + positions
- name + labware + slot
Args:
data: 原始 labware 数据列表
Returns:
规范化后的 labware 字典,格式为 {name: {"slot": int, "labware": str, "well": list, "type": str, "role": str, "name": str}, ...}
"""
labware = {}
for item in data:
# 获取 key 名称(优先使用 reagent_name其次是 material_name 或 name
reagent_name = item.get("reagent_name")
key = reagent_name or item.get("material_name") or item.get("name")
if not key:
continue
key = str(key)
# 处理重复 key自动添加后缀
idx = 1
original_key = key
while key in labware:
idx += 1
key = f"{original_key}_{idx}"
labware[key] = {
"slot": item.get("positions") or item.get("slot"),
"labware": item.get("material_name") or item.get("labware"),
"well": item.get("well", []),
"type": item.get("type", "reagent"),
"role": item.get("role", ""),
"name": key,
}
return labware
def convert_from_json( def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi", workstation_name: str = DEFAULT_WORKSTATION,
validate: bool = True, validate: bool = True,
) -> WorkflowGraph: ) -> WorkflowGraph:
""" """
从 JSON 数据或文件转换为 WorkflowGraph 从 JSON 数据或文件转换为 WorkflowGraph
支持的 JSON 格式 JSON 格式:
1. {"workflow": [...], "reagent": {...}} - 直接格式 {"workflow": [...], "reagent": {...}}
2. {"steps_info": [...], "labware_info": [...]} - 需要规范化的格式
Args: Args:
data: JSON 文件路径、字典数据、或 JSON 字符串 data: JSON 文件路径、字典数据、或 JSON 字符串
@@ -251,7 +222,7 @@ def convert_from_json(
WorkflowGraph: 构建好的工作流图 WorkflowGraph: 构建好的工作流图
Raises: Raises:
ValueError: 不支持的 JSON 格式 或 句柄校验失败 ValueError: 不支持的 JSON 格式
FileNotFoundError: 文件不存在 FileNotFoundError: 文件不存在
json.JSONDecodeError: JSON 解析失败 json.JSONDecodeError: JSON 解析失败
""" """
@@ -262,7 +233,6 @@ def convert_from_json(
with path.open("r", encoding="utf-8") as fp: with path.open("r", encoding="utf-8") as fp:
json_data = json.load(fp) json_data = json.load(fp)
elif isinstance(data, str): elif isinstance(data, str):
# 尝试作为 JSON 字符串解析
json_data = json.loads(data) json_data = json.loads(data)
else: else:
raise FileNotFoundError(f"文件不存在: {data}") raise FileNotFoundError(f"文件不存在: {data}")
@@ -271,30 +241,24 @@ def convert_from_json(
else: else:
raise TypeError(f"不支持的数据类型: {type(data)}") raise TypeError(f"不支持的数据类型: {type(data)}")
# 根据格式解析数据 # 校验格式
if "workflow" in json_data and "reagent" in json_data: if "workflow" not in json_data or "reagent" not in json_data:
# 格式1: workflow/reagent已经是规范格式
protocol_steps = json_data["workflow"]
labware_info = json_data["reagent"]
elif "steps_info" in json_data and "labware_info" in json_data:
# 格式2: steps_info/labware_info需要规范化
protocol_steps = normalize_steps(json_data["steps_info"])
labware_info = normalize_labware(json_data["labware_info"])
elif "steps" in json_data and "labware" in json_data:
# 格式3: steps/labware另一种常见格式
protocol_steps = normalize_steps(json_data["steps"])
if isinstance(json_data["labware"], list):
labware_info = normalize_labware(json_data["labware"])
else:
labware_info = json_data["labware"]
else:
raise ValueError( raise ValueError(
"不支持的 JSON 格式。支持的格式\n" "不支持的 JSON 格式。请使用标准格式:\n"
"1. {'workflow': [...], 'reagent': {...}}\n" '{"workflow": [{"action": "...", "action_args": {...}}, ...], '
"2. {'steps_info': [...], 'labware_info': [...]}\n" '"reagent": {"name": {"slot": int, "well": [...], "labware": "..."}, ...}}'
"3. {'steps': [...], 'labware': [...]}"
) )
# 提取数据
workflow = json_data["workflow"]
reagent = json_data["reagent"]
# 规范化步骤数据
protocol_steps = normalize_workflow_steps(workflow)
# reagent 已经是字典格式,直接使用
labware_info = reagent
# 构建工作流图 # 构建工作流图
graph = build_protocol_graph( graph = build_protocol_graph(
labware_info=labware_info, labware_info=labware_info,
@@ -317,7 +281,7 @@ def convert_from_json(
def convert_json_to_node_link( def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi", workstation_name: str = DEFAULT_WORKSTATION,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
将 JSON 数据转换为 node-link 格式的字典 将 JSON 数据转换为 node-link 格式的字典
@@ -335,7 +299,7 @@ def convert_json_to_node_link(
def convert_json_to_workflow_list( def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]], data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi", workstation_name: str = DEFAULT_WORKSTATION,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
将 JSON 数据转换为工作流列表格式 将 JSON 数据转换为工作流列表格式
@@ -349,8 +313,3 @@ def convert_json_to_workflow_list(
""" """
graph = convert_from_json(data, workstation_name) graph = convert_from_json(data, workstation_name)
return graph.to_dict() return graph.to_dict()
# 为了向后兼容,保留下划线前缀的别名
_normalize_steps = normalize_steps
_normalize_labware = normalize_labware

View File

@@ -0,0 +1,356 @@
"""
JSON 工作流转换模块
提供从多种 JSON 格式转换为统一工作流格式的功能。
支持的格式:
1. workflow/reagent 格式
2. steps_info/labware_info 格式
"""
import json
from os import PathLike
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from unilabos.workflow.common import WorkflowGraph, build_protocol_graph
from unilabos.registry.registry import lab_registry
def get_action_handles(resource_name: str, template_name: str) -> Dict[str, List[str]]:
"""
从 registry 获取指定设备和动作的 handles 配置
Args:
resource_name: 设备资源名称,如 "liquid_handler.prcxi"
template_name: 动作模板名称,如 "transfer_liquid"
Returns:
包含 source 和 target handler_keys 的字典:
{"source": ["sources_out", "targets_out", ...], "target": ["sources", "targets", ...]}
"""
result = {"source": [], "target": []}
device_info = lab_registry.device_type_registry.get(resource_name, {})
if not device_info:
return result
action_mappings = device_info.get("class", {}).get("action_value_mappings", {})
action_config = action_mappings.get(template_name, {})
handles = action_config.get("handles", {})
if isinstance(handles, dict):
# 处理 input handles (作为 target)
for handle in handles.get("input", []):
handler_key = handle.get("handler_key", "")
if handler_key:
result["source"].append(handler_key)
# 处理 output handles (作为 source)
for handle in handles.get("output", []):
handler_key = handle.get("handler_key", "")
if handler_key:
result["target"].append(handler_key)
return result
def validate_workflow_handles(graph: WorkflowGraph) -> Tuple[bool, List[str]]:
"""
校验工作流图中所有边的句柄配置是否正确
Args:
graph: 工作流图对象
Returns:
(is_valid, errors): 是否有效,错误信息列表
"""
errors = []
nodes = graph.nodes
for edge in graph.edges:
left_uuid = edge.get("source")
right_uuid = edge.get("target")
# target_handle_key是target, right的输入节点入节点
# source_handle_key是source, left的输出节点出节点
right_source_conn_key = edge.get("target_handle_key", "")
left_target_conn_key = edge.get("source_handle_key", "")
# 获取源节点和目标节点信息
left_node = nodes.get(left_uuid, {})
right_node = nodes.get(right_uuid, {})
left_res_name = left_node.get("resource_name", "")
left_template_name = left_node.get("template_name", "")
right_res_name = right_node.get("resource_name", "")
right_template_name = right_node.get("template_name", "")
# 获取源节点的 output handles
left_node_handles = get_action_handles(left_res_name, left_template_name)
target_valid_keys = left_node_handles.get("target", [])
target_valid_keys.append("ready")
# 获取目标节点的 input handles
right_node_handles = get_action_handles(right_res_name, right_template_name)
source_valid_keys = right_node_handles.get("source", [])
source_valid_keys.append("ready")
# 如果节点配置了 output handles则 source_port 必须有效
if not right_source_conn_key:
node_name = left_node.get("name", left_uuid[:8])
errors.append(f"源节点 '{node_name}' 的 source_handle_key 为空," f"应设置为: {source_valid_keys}")
elif right_source_conn_key not in source_valid_keys:
node_name = left_node.get("name", left_uuid[:8])
errors.append(
f"源节点 '{node_name}' 的 source 端点 '{right_source_conn_key}' 不存在," f"支持的端点: {source_valid_keys}"
)
# 如果节点配置了 input handles则 target_port 必须有效
if not left_target_conn_key:
node_name = right_node.get("name", right_uuid[:8])
errors.append(f"目标节点 '{node_name}' 的 target_handle_key 为空," f"应设置为: {target_valid_keys}")
elif left_target_conn_key not in target_valid_keys:
node_name = right_node.get("name", right_uuid[:8])
errors.append(
f"目标节点 '{node_name}' 的 target 端点 '{left_target_conn_key}' 不存在,"
f"支持的端点: {target_valid_keys}"
)
return len(errors) == 0, errors
# action 到 resource_name 的映射
ACTION_RESOURCE_MAPPING: Dict[str, str] = {
# 生物实验操作
"transfer_liquid": "liquid_handler.prcxi",
"transfer": "liquid_handler.prcxi",
"incubation": "incubator.prcxi",
"move_labware": "labware_mover.prcxi",
"oscillation": "shaker.prcxi",
# 有机化学操作
"HeatChillToTemp": "heatchill.chemputer",
"StopHeatChill": "heatchill.chemputer",
"StartHeatChill": "heatchill.chemputer",
"HeatChill": "heatchill.chemputer",
"Dissolve": "stirrer.chemputer",
"Transfer": "liquid_handler.chemputer",
"Evaporate": "rotavap.chemputer",
"Recrystallize": "reactor.chemputer",
"Filter": "filter.chemputer",
"Dry": "dryer.chemputer",
"Add": "liquid_handler.chemputer",
}
def normalize_steps(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
将不同格式的步骤数据规范化为统一格式
支持的输入格式:
- action + parameters
- action + action_args
- operation + parameters
Args:
data: 原始步骤数据列表
Returns:
规范化后的步骤列表,格式为 [{"action": str, "parameters": dict, "description": str?, "step_number": int?}, ...]
"""
normalized = []
for idx, step in enumerate(data):
# 获取动作名称(支持 action 或 operation 字段)
action = step.get("action") or step.get("operation")
if not action:
continue
# 获取参数(支持 parameters 或 action_args 字段)
raw_params = step.get("parameters") or step.get("action_args") or {}
params = dict(raw_params)
# 规范化 source/target -> sources/targets
if "source" in raw_params and "sources" not in raw_params:
params["sources"] = raw_params["source"]
if "target" in raw_params and "targets" not in raw_params:
params["targets"] = raw_params["target"]
# 获取描述(支持 description 或 purpose 字段)
description = step.get("description") or step.get("purpose")
# 获取步骤编号(优先使用原始数据中的 step_number否则使用索引+1
step_number = step.get("step_number", idx + 1)
step_dict = {"action": action, "parameters": params, "step_number": step_number}
if description:
step_dict["description"] = description
normalized.append(step_dict)
return normalized
def normalize_labware(data: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
将不同格式的 labware 数据规范化为统一的字典格式
支持的输入格式:
- reagent_name + material_name + positions
- name + labware + slot
Args:
data: 原始 labware 数据列表
Returns:
规范化后的 labware 字典,格式为 {name: {"slot": int, "labware": str, "well": list, "type": str, "role": str, "name": str}, ...}
"""
labware = {}
for item in data:
# 获取 key 名称(优先使用 reagent_name其次是 material_name 或 name
reagent_name = item.get("reagent_name")
key = reagent_name or item.get("material_name") or item.get("name")
if not key:
continue
key = str(key)
# 处理重复 key自动添加后缀
idx = 1
original_key = key
while key in labware:
idx += 1
key = f"{original_key}_{idx}"
labware[key] = {
"slot": item.get("positions") or item.get("slot"),
"labware": item.get("material_name") or item.get("labware"),
"well": item.get("well", []),
"type": item.get("type", "reagent"),
"role": item.get("role", ""),
"name": key,
}
return labware
def convert_from_json(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
validate: bool = True,
) -> WorkflowGraph:
"""
从 JSON 数据或文件转换为 WorkflowGraph
支持的 JSON 格式:
1. {"workflow": [...], "reagent": {...}} - 直接格式
2. {"steps_info": [...], "labware_info": [...]} - 需要规范化的格式
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
validate: 是否校验句柄配置,默认 True
Returns:
WorkflowGraph: 构建好的工作流图
Raises:
ValueError: 不支持的 JSON 格式 或 句柄校验失败
FileNotFoundError: 文件不存在
json.JSONDecodeError: JSON 解析失败
"""
# 处理输入数据
if isinstance(data, (str, PathLike)):
path = Path(data)
if path.exists():
with path.open("r", encoding="utf-8") as fp:
json_data = json.load(fp)
elif isinstance(data, str):
# 尝试作为 JSON 字符串解析
json_data = json.loads(data)
else:
raise FileNotFoundError(f"文件不存在: {data}")
elif isinstance(data, dict):
json_data = data
else:
raise TypeError(f"不支持的数据类型: {type(data)}")
# 根据格式解析数据
if "workflow" in json_data and "reagent" in json_data:
# 格式1: workflow/reagent已经是规范格式
protocol_steps = json_data["workflow"]
labware_info = json_data["reagent"]
elif "steps_info" in json_data and "labware_info" in json_data:
# 格式2: steps_info/labware_info需要规范化
protocol_steps = normalize_steps(json_data["steps_info"])
labware_info = normalize_labware(json_data["labware_info"])
elif "steps" in json_data and "labware" in json_data:
# 格式3: steps/labware另一种常见格式
protocol_steps = normalize_steps(json_data["steps"])
if isinstance(json_data["labware"], list):
labware_info = normalize_labware(json_data["labware"])
else:
labware_info = json_data["labware"]
else:
raise ValueError(
"不支持的 JSON 格式。支持的格式:\n"
"1. {'workflow': [...], 'reagent': {...}}\n"
"2. {'steps_info': [...], 'labware_info': [...]}\n"
"3. {'steps': [...], 'labware': [...]}"
)
# 构建工作流图
graph = build_protocol_graph(
labware_info=labware_info,
protocol_steps=protocol_steps,
workstation_name=workstation_name,
action_resource_mapping=ACTION_RESOURCE_MAPPING,
)
# 校验句柄配置
if validate:
is_valid, errors = validate_workflow_handles(graph)
if not is_valid:
import warnings
for error in errors:
warnings.warn(f"句柄校验警告: {error}")
return graph
def convert_json_to_node_link(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
) -> Dict[str, Any]:
"""
将 JSON 数据转换为 node-link 格式的字典
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
Returns:
Dict: node-link 格式的工作流数据
"""
graph = convert_from_json(data, workstation_name)
return graph.to_node_link_dict()
def convert_json_to_workflow_list(
data: Union[str, PathLike, Dict[str, Any]],
workstation_name: str = "PRCXi",
) -> List[Dict[str, Any]]:
"""
将 JSON 数据转换为工作流列表格式
Args:
data: JSON 文件路径、字典数据、或 JSON 字符串
workstation_name: 工作站名称,默认 "PRCXi"
Returns:
List: 工作流节点列表
"""
graph = convert_from_json(data, workstation_name)
return graph.to_dict()
# 为了向后兼容,保留下划线前缀的别名
_normalize_steps = normalize_steps
_normalize_labware = normalize_labware