Add workflow upload func.

This commit is contained in:
Xuwznln
2025-12-08 19:12:05 +08:00
parent ced961050d
commit 16ee3de086
32 changed files with 811 additions and 222 deletions

View File

@@ -10,6 +10,7 @@ Json = Dict[str, Any]
# ---------------- Graph ----------------
class WorkflowGraph:
"""简单的有向图实现:使用 params 单层参数inputs 内含连线;支持 node-link 导出"""
@@ -21,20 +22,31 @@ class WorkflowGraph:
self.nodes[node_id] = attrs
def add_edge(self, source: str, target: str, **attrs):
# 将 source_port/target_port 映射为服务端期望的 source_handle_key/target_handle_key
source_handle_key = attrs.pop("source_port", "") or attrs.pop("source_handle_key", "")
target_handle_key = attrs.pop("target_port", "") or attrs.pop("target_handle_key", "")
edge = {
"source": source,
"target": target,
"source_node_uuid": source,
"target_node_uuid": target,
"source_handle_key": source_handle_key,
"source_handle_io": attrs.pop("source_handle_io", "source"),
"target_handle_key": target_handle_key,
"target_handle_io": attrs.pop("target_handle_io", "target"),
**attrs
**attrs,
}
self.edges.append(edge)
def _materialize_wiring_into_inputs(self, obj: Any, inputs: Dict[str, Any],
variable_sources: Dict[str, Dict[str, Any]],
target_node_id: str, base_path: List[str]):
def _materialize_wiring_into_inputs(
self,
obj: Any,
inputs: Dict[str, Any],
variable_sources: Dict[str, Dict[str, Any]],
target_node_id: str,
base_path: List[str],
):
has_var = False
def walk(node: Any, path: List[str]):
@@ -48,9 +60,12 @@ class WorkflowGraph:
if src:
key = ".".join(path) # e.g. "params.foo.bar.0"
inputs[key] = {"node": src["node_id"], "output": src.get("output_name", "result")}
self.add_edge(str(src["node_id"]), target_node_id,
source_handle_io=src.get("output_name", "result"),
target_handle_io=key)
self.add_edge(
str(src["node_id"]),
target_node_id,
source_handle_io=src.get("output_name", "result"),
target_handle_io=key,
)
return placeholder
return {k: walk(v, path + [k]) for k, v in node.items()}
if isinstance(node, list):
@@ -60,18 +75,20 @@ class WorkflowGraph:
replaced = walk(obj, base_path[:])
return replaced, has_var
def add_workflow_node(self,
node_id: int,
*,
device_key: Optional[str] = None, # 实例名,如 "ser"
resource_name: Optional[str] = None, # registry key原 device_class
module: Optional[str] = None,
template_name: Optional[str] = None, # 动作/模板名(原 action_key
params: Dict[str, Any],
variable_sources: Dict[str, Dict[str, Any]],
add_ready_if_no_vars: bool = True,
prev_node_id: Optional[int] = None,
**extra_attrs) -> None:
def add_workflow_node(
self,
node_id: int,
*,
device_key: Optional[str] = None, # 实例名,如 "ser"
resource_name: Optional[str] = None, # registry key原 device_class
module: Optional[str] = None,
template_name: Optional[str] = None, # 动作/模板名(原 action_key
params: Dict[str, Any],
variable_sources: Dict[str, Dict[str, Any]],
add_ready_if_no_vars: bool = True,
prev_node_id: Optional[int] = None,
**extra_attrs,
) -> None:
"""添加工作流节点params 单层;自动变量连线与 ready 串联;支持附加属性"""
node_id_str = str(node_id)
inputs: Dict[str, Any] = {}
@@ -87,9 +104,9 @@ class WorkflowGraph:
node_obj = {
"device_key": device_key,
"resource_name": resource_name, # ✅ 新名字
"resource_name": resource_name, # ✅ 新名字
"module": module,
"template_name": template_name, # ✅ 新名字
"template_name": template_name, # ✅ 新名字
"params": params,
"inputs": inputs,
}
@@ -100,13 +117,13 @@ class WorkflowGraph:
def to_dict(self) -> List[Dict[str, Any]]:
result = []
for node_id, attrs in self.nodes.items():
node = {"id": node_id}
node = {"uuid": node_id}
params = dict(attrs.get("parameters", {}) or {})
flat = {k: v for k, v in attrs.items() if k != "parameters"}
flat.update(params)
node.update(flat)
result.append(node)
return sorted(result, key=lambda n: int(n["id"]) if str(n["id"]).isdigit() else n["id"])
return sorted(result, key=lambda n: int(n["uuid"]) if str(n["uuid"]).isdigit() else n["uuid"])
# node-link 导出(含 edges
def to_node_link_dict(self) -> Dict[str, Any]:
@@ -115,12 +132,27 @@ class WorkflowGraph:
node_attrs = attrs.copy()
params = node_attrs.pop("parameters", {}) or {}
node_attrs.update(params)
nodes_list.append({"id": node_id, **node_attrs})
return {"directed": True, "multigraph": False, "graph": {}, "nodes": nodes_list, "edges": self.edges, "links": self.edges}
nodes_list.append({"uuid": node_id, **node_attrs})
return {
"directed": True,
"multigraph": False,
"graph": {},
"nodes": nodes_list,
"edges": self.edges,
"links": self.edges,
}
def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""统一的数据重构函数,根据操作类型自动选择模板"""
def refactor_data(
data: List[Dict[str, Any]],
action_resource_mapping: Optional[Dict[str, str]] = None,
) -> List[Dict[str, Any]]:
"""统一的数据重构函数,根据操作类型自动选择模板
Args:
data: 原始步骤数据列表
action_resource_mapping: action 到 resource_name 的映射字典,可选
"""
refactored_data = []
# 定义操作映射,包含生物实验和有机化学的所有操作
@@ -157,43 +189,67 @@ def refactor_data(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
times = step.get("times", step.get("parameters", {}).get("times", 1))
sub_steps = step.get("steps", step.get("parameters", {}).get("steps", []))
for i in range(int(times)):
sub_data = refactor_data(sub_steps)
sub_data = refactor_data(sub_steps, action_resource_mapping)
refactored_data.extend(sub_data)
continue
# 获取模板名称
template = OPERATION_MAPPING.get(operation)
if not template:
template_name = OPERATION_MAPPING.get(operation)
if not template_name:
# 自动推断模板类型
if operation.lower() in ["transfer", "incubation", "move_labware", "oscillation"]:
template = f"biomek-{operation}"
template_name = f"biomek-{operation}"
else:
template = f"{operation}Protocol"
template_name = f"{operation}Protocol"
# 获取 resource_name
resource_name = f"device.{operation.lower()}"
if action_resource_mapping:
resource_name = action_resource_mapping.get(operation, resource_name)
# 获取步骤编号,生成 name 字段
step_number = step.get("step_number")
name = f"Step {step_number}" if step_number is not None else None
# 创建步骤数据
step_data = {
"template": template,
"template_name": template_name,
"resource_name": resource_name,
"description": step.get("description", step.get("purpose", f"{operation} operation")),
"lab_node_type": "Device",
"parameters": step.get("parameters", step.get("action_args", {})),
"param": step.get("parameters", step.get("action_args", {})),
"footer": f"{template_name}-{resource_name}",
}
if name:
step_data["name"] = name
refactored_data.append(step_data)
return refactored_data
def build_protocol_graph(
labware_info: List[Dict[str, Any]], protocol_steps: List[Dict[str, Any]], workstation_name: str
labware_info: List[Dict[str, Any]],
protocol_steps: List[Dict[str, Any]],
workstation_name: str,
action_resource_mapping: Optional[Dict[str, str]] = None,
) -> WorkflowGraph:
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑"""
"""统一的协议图构建函数,根据设备类型自动选择构建逻辑
Args:
labware_info: labware 信息字典
protocol_steps: 协议步骤列表
workstation_name: 工作站名称
action_resource_mapping: action 到 resource_name 的映射字典,可选
"""
G = WorkflowGraph()
resource_last_writer = {}
protocol_steps = refactor_data(protocol_steps)
protocol_steps = refactor_data(protocol_steps, action_resource_mapping)
# 有机化学&移液站协议图构建
WORKSTATION_ID = workstation_name
# 为所有labware创建资源节点
res_index = 0
for labware_id, item in labware_info.items():
# item_id = item.get("id") or item.get("name", f"item_{uuid.uuid4()}")
node_id = str(uuid.uuid4())
@@ -217,13 +273,16 @@ def build_protocol_graph(
liquid_type = [labware_id]
liquid_volume = [1e5]
res_index += 1
G.add_node(
node_id,
template_name=f"create_resource",
template_name="create_resource",
resource_name="host_node",
name=f"Res {res_index}",
description=description,
lab_node_type=lab_node_type,
params={
footer="create_resource-host_node",
param={
"res_id": labware_id,
"device_id": WORKSTATION_ID,
"class_name": "container",
@@ -234,7 +293,6 @@ def build_protocol_graph(
"liquid_volume": liquid_volume,
"slot_on_deck": "",
},
role=item.get("role", ""),
)
resource_last_writer[labware_id] = f"{node_id}:labware"
@@ -251,7 +309,7 @@ def build_protocol_graph(
last_control_node_id = node_id
# 物料流
params = step.get("parameters", {})
params = step.get("param", {})
input_resources_possible_names = [
"vessel",
"to_vessel",
@@ -299,7 +357,7 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str):
G = nx.DiGraph()
for node_id, attrs in protocol_graph.nodes.items():
label = attrs.get("description", attrs.get("template", node_id[:8]))
label = attrs.get("description", attrs.get("template_name", node_id[:8]))
G.add_node(node_id, label=label, **attrs)
for edge in protocol_graph.edges:
@@ -331,11 +389,13 @@ def draw_protocol_graph(protocol_graph: WorkflowGraph, output_path: str):
print(f" - Visualization saved to '{output_path}'")
COMPASS = {"n","e","s","w","ne","nw","se","sw","c"}
COMPASS = {"n", "e", "s", "w", "ne", "nw", "se", "sw", "c"}
def _is_compass(port: str) -> bool:
return isinstance(port, str) and port.lower() in COMPASS
def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: str = "LR"):
"""
使用 Graphviz 端口语法绘制协议工作流图。
@@ -350,22 +410,22 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 1) 先用 networkx 搭建有向图,保留端口属性
G = nx.DiGraph()
for node_id, attrs in protocol_graph.nodes.items():
label = attrs.get("description", attrs.get("template", node_id[:8]))
label = attrs.get("description", attrs.get("template_name", node_id[:8]))
# 保留一个干净的“中心标签”,用于放在 record 的中间槽
G.add_node(node_id, _core_label=str(label), **{k:v for k,v in attrs.items() if k not in ("label",)})
G.add_node(node_id, _core_label=str(label), **{k: v for k, v in attrs.items() if k not in ("label",)})
edges_data = []
in_ports_by_node = {} # 收集命名输入端口
in_ports_by_node = {} # 收集命名输入端口
out_ports_by_node = {} # 收集命名输出端口
for edge in protocol_graph.edges:
u = edge["source"]
v = edge["target"]
sp = edge.get("source_port")
tp = edge.get("target_port")
sp = edge.get("source_handle_key") or edge.get("source_port")
tp = edge.get("target_handle_key") or edge.get("target_port")
# 记录到图里(保留原始端口信息)
G.add_edge(u, v, source_port=sp, target_port=tp)
G.add_edge(u, v, source_handle_key=sp, target_handle_key=tp)
edges_data.append((u, v, sp, tp))
# 如果不是 compass就按“命名端口”先归类等会儿给节点造 record
@@ -377,7 +437,9 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 2) 转为 AGraph使用 Graphviz 渲染
A = to_agraph(G)
A.graph_attr.update(rankdir=rankdir, splines="true", concentrate="false", fontsize="10")
A.node_attr.update(shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica")
A.node_attr.update(
shape="box", style="rounded,filled", fillcolor="lightyellow", color="#999999", fontname="Helvetica"
)
A.edge_attr.update(arrowsize="0.8", color="#666666")
# 3) 为需要命名端口的节点设置 record 形状与 label
@@ -386,18 +448,19 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
node = A.get_node(n)
core = G.nodes[n].get("_core_label", n)
in_ports = sorted(in_ports_by_node.get(n, []))
in_ports = sorted(in_ports_by_node.get(n, []))
out_ports = sorted(out_ports_by_node.get(n, []))
# 如果该节点涉及命名端口,则用 record否则保留原 box
if in_ports or out_ports:
def port_fields(ports):
if not ports:
return " " # 必须留一个空槽占位
# 每个端口一个小格子,<p> name
return "|".join(f"<{re.sub(r'[^A-Za-z0-9_:.|-]', '_', p)}> {p}" for p in ports)
left = port_fields(in_ports)
left = port_fields(in_ports)
right = port_fields(out_ports)
# 三栏:左(入) | 中(节点名) | 右(出)
@@ -410,7 +473,7 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 4) 给边设置 headport / tailport
# - 若端口为 compass直接用 compasse.g., headport="e"
# - 若端口为命名端口:使用在 record 中定义的 <port> 名(同名即可)
for (u, v, sp, tp) in edges_data:
for u, v, sp, tp in edges_data:
e = A.get_edge(u, v)
# Graphviz 属性tail 是源head 是目标
@@ -419,13 +482,13 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
e.attr["tailport"] = sp.lower()
else:
# 与 record label 中 <port> 名一致;特殊字符已在 label 中做了清洗
e.attr["tailport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(sp))
e.attr["tailport"] = re.sub(r"[^A-Za-z0-9_:.|-]", "_", str(sp))
if tp:
if _is_compass(tp):
e.attr["headport"] = tp.lower()
else:
e.attr["headport"] = re.sub(r'[^A-Za-z0-9_:.|-]', '_', str(tp))
e.attr["headport"] = re.sub(r"[^A-Za-z0-9_:.|-]", "_", str(tp))
# 可选:若想让边更贴边缘,可设置 constraint/spline 等
# e.attr["arrowhead"] = "vee"
@@ -433,11 +496,14 @@ def draw_protocol_graph_with_ports(protocol_graph, output_path: str, rankdir: st
# 5) 输出
A.draw(output_path, prog="dot")
print(f" - Port-aware workflow rendered to '{output_path}'")
# ---------------- Registry Adapter ----------------
class RegistryAdapter:
"""根据 module 的类名(冒号右侧)反查 registry 的 resource_name原 device_class并抽取参数顺序"""
def __init__(self, device_registry: Dict[str, Any]):
self.device_registry = device_registry or {}
self.module_class_to_resource = self._build_module_class_index()
@@ -455,8 +521,7 @@ class RegistryAdapter:
def resolve_resource_by_classname(self, class_name: str) -> Optional[str]:
if not class_name:
return None
return (self.module_class_to_resource.get(class_name)
or self.module_class_to_resource.get(class_name.lower()))
return self.module_class_to_resource.get(class_name) or self.module_class_to_resource.get(class_name.lower())
def get_device_module(self, resource_name: Optional[str]) -> Optional[str]:
if not resource_name:
@@ -466,9 +531,7 @@ class RegistryAdapter:
def get_actions(self, resource_name: Optional[str]) -> Dict[str, Any]:
if not resource_name:
return {}
return (self.device_registry.get(resource_name, {})
.get("class", {})
.get("action_value_mappings", {})) or {}
return (self.device_registry.get(resource_name, {}).get("class", {}).get("action_value_mappings", {})) or {}
def get_action_schema(self, resource_name: Optional[str], template_name: str) -> Optional[Json]:
return (self.get_actions(resource_name).get(template_name) or {}).get("schema")