feat: 动态获取工作流步骤ID,优化工作流配置

This commit is contained in:
ZiWei
2025-12-31 14:42:43 +08:00
parent 93ac095e0a
commit 5c249e66a2

View File

@@ -2,6 +2,8 @@ import json
import time
import requests
from typing import List, Dict, Any
import json
import requests
from pathlib import Path
from datetime import datetime
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
@@ -91,6 +93,116 @@ class BioyondReactionStation(BioyondWorkstation):
# 用于缓存待处理的时间约束
self.pending_time_constraints = []
# 动态获取工作流步骤ID
self.workflow_step_ids = self._fetch_workflow_step_ids()
def _fetch_workflow_step_ids(self) -> Dict[str, Dict[str, str]]:
"""动态获取工作流步骤ID"""
print("正在从LIMS获取最新工作流步骤ID...")
api_host = API_CONFIG.get("api_host")
api_key = API_CONFIG.get("api_key")
if not api_host or not api_key:
print("API配置缺失使用默认配置")
return WORKFLOW_STEP_IDS
def call_api(endpoint, data=None):
url = f"{api_host}{endpoint}"
payload = {
"apiKey": api_key,
"requestTime": datetime.now().isoformat(),
"data": data if data else {}
}
try:
response = requests.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=5)
return response.json()
except Exception as e:
print(f"调用API {endpoint} 失败: {e}")
return None
# 1. 获取工作流列表
resp = call_api("/api/lims/workflow/work-flow-list", {"type": 2, "includeDetail": True})
if not resp:
print("无法获取工作流列表,使用默认配置")
return WORKFLOW_STEP_IDS
workflows = resp.get("data", [])
if isinstance(workflows, dict):
if "list" in workflows:
workflows = workflows["list"]
elif "items" in workflows:
workflows = workflows["items"]
if not workflows:
print("工作流列表为空,使用默认配置")
return WORKFLOW_STEP_IDS
new_ids = {}
# 2. 遍历映射表
for internal_name, section_name in WORKFLOW_TO_SECTION_MAP.items():
# 查找对应的工作流对象
wf_obj = next((w for w in workflows if w.get("name") == section_name), None)
if not wf_obj:
# print(f"未找到工作流: {section_name}")
continue
# 获取 subWorkflowId
sub_wf_id = None
if wf_obj.get("subWorkflows"):
sub_wfs = wf_obj.get("subWorkflows")
if len(sub_wfs) > 0:
sub_wf_id = sub_wfs[0].get("id")
if not sub_wf_id:
# print(f"工作流 {section_name} 没有子工作流ID")
continue
# 3. 获取步骤参数
step_resp = call_api("/api/lims/workflow/sub-workflow-step-parameters", sub_wf_id)
if not step_resp or not step_resp.get("data"):
# print(f"无法获取工作流 {section_name} 的步骤参数")
continue
steps_data = step_resp.get("data", {})
step_name_to_id = {}
if isinstance(steps_data, dict):
for s_id, step_list in steps_data.items():
if isinstance(step_list, list):
for step in step_list:
s_name = step.get("name")
if s_name:
step_name_to_id[s_name] = s_id
# 4. 匹配 ACTION_NAMES
target_key = internal_name
normalized_key = internal_name.lower().replace('(', '_').replace(')', '').replace('-', '_')
if internal_name in ACTION_NAMES:
target_key = internal_name
elif normalized_key in ACTION_NAMES:
target_key = normalized_key
elif internal_name.lower() in ACTION_NAMES:
target_key = internal_name.lower()
if target_key in ACTION_NAMES:
new_ids[target_key] = {}
for key, action_display_name in ACTION_NAMES[target_key].items():
step_id = step_name_to_id.get(action_display_name)
if step_id:
new_ids[target_key][key] = step_id
else:
print(f"警告: 工作流 '{section_name}' 中未找到步骤 '{action_display_name}'")
if not new_ids:
print("未能获取任何新的步骤ID使用默认配置")
return WORKFLOW_STEP_IDS
print("成功更新工作流步骤ID")
return new_ids
@property
def workflow_sequence(self) -> str:
@@ -178,7 +290,7 @@ class BioyondReactionStation(BioyondWorkstation):
temperature = float(temperature)
step_id = WORKFLOW_STEP_IDS["reactor_taken_in"]["config"]
step_id = self.workflow_step_ids["reactor_taken_in"]["config"]
reactor_taken_in_params = {
"param_values": {
step_id: {
@@ -228,8 +340,8 @@ class BioyondReactionStation(BioyondWorkstation):
if isinstance(temperature, str):
temperature = float(temperature)
feeding_step_id = WORKFLOW_STEP_IDS["solid_feeding_vials"]["feeding"]
observe_step_id = WORKFLOW_STEP_IDS["solid_feeding_vials"]["observe"]
feeding_step_id = self.workflow_step_ids["solid_feeding_vials"]["feeding"]
observe_step_id = self.workflow_step_ids["solid_feeding_vials"]["observe"]
solid_feeding_vials_params = {
"param_values": {
@@ -266,7 +378,7 @@ class BioyondReactionStation(BioyondWorkstation):
"""液体进料小瓶(非滴定)
Args:
volume_formula: 分液公式(mL)
volume_formula: 分液公式(μL)
assign_material_name: 物料名称
titration_type: 是否滴定(NO=1, YES=2)
time: 观察时间(分钟)
@@ -288,8 +400,8 @@ class BioyondReactionStation(BioyondWorkstation):
if isinstance(temperature, str):
temperature = float(temperature)
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_vials_non_titration"]["liquid"]
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_vials_non_titration"]["observe"]
liquid_step_id = self.workflow_step_ids["liquid_feeding_vials_non_titration"]["liquid"]
observe_step_id = self.workflow_step_ids["liquid_feeding_vials_non_titration"]["observe"]
params = {
"param_values": {
@@ -311,7 +423,7 @@ class BioyondReactionStation(BioyondWorkstation):
}
self.pending_task_params.append(params)
print(f"成功添加液体进料小瓶(非滴定)参数: volume={volume_formula}mL, material={assign_material_name}->ID:{material_id}")
print(f"成功添加液体进料小瓶(非滴定)参数: volume={volume_formula}μL, material={assign_material_name}->ID:{material_id}")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
@@ -329,13 +441,13 @@ class BioyondReactionStation(BioyondWorkstation):
Args:
assign_material_name: 物料名称
volume: 分液量(mL),直接指定体积(可选,如果提供solvents则自动计算)
volume: 分液量(μL),直接指定体积(可选,如果提供solvents则自动计算)
solvents: 溶剂信息的字典或JSON字符串(可选),格式如下:
{
"additional_solvent": 33.55092503597727, # 溶剂体积(mL)
"total_liquid_volume": 48.00916988195499
}
如果提供solvents,则从中提取additional_solvent(单位:mL)
如果提供solvents,则从中提取additional_solvent并转换为μL
titration_type: 是否滴定(NO=1, YES=2)
time: 观察时间(分钟)
torque_variation: 是否观察(NO=1, YES=2)
@@ -366,8 +478,8 @@ class BioyondReactionStation(BioyondWorkstation):
if additional_solvent is None:
raise ValueError("solvents 中没有找到 additional_solvent 字段")
# 直接使用毫升(mL)
volume = str(float(additional_solvent))
# 转换为微升(μL) - 从毫升(mL)转换
volume = str(float(additional_solvent) * 1000)
elif volume is None:
raise ValueError("必须提供 volume 或 solvents 参数之一")
@@ -379,8 +491,8 @@ class BioyondReactionStation(BioyondWorkstation):
if isinstance(temperature, str):
temperature = float(temperature)
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_solvents"]["liquid"]
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_solvents"]["observe"]
liquid_step_id = self.workflow_step_ids["liquid_feeding_solvents"]["liquid"]
observe_step_id = self.workflow_step_ids["liquid_feeding_solvents"]["observe"]
params = {
"param_values": {
@@ -402,7 +514,7 @@ class BioyondReactionStation(BioyondWorkstation):
}
self.pending_task_params.append(params)
print(f"成功添加液体进料溶剂参数: material={assign_material_name}->ID:{material_id}, volume={volume}mL")
print(f"成功添加液体进料溶剂参数: material={assign_material_name}->ID:{material_id}, volume={volume}μL")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
@@ -426,7 +538,7 @@ class BioyondReactionStation(BioyondWorkstation):
Args:
assign_material_name: 物料名称
volume_formula: 分液公式(mL),如果提供则直接使用,否则自动计算
volume_formula: 分液公式(μL),如果提供则直接使用,否则自动计算
x_value: 手工输入的x值,格式如 "1-2-3"
feeding_order_data: feeding_order JSON字符串或对象,用于获取m二酐值
extracted_actuals: 从报告提取的实际加料量JSON字符串,包含actualTargetWeigh和actualVolume
@@ -435,7 +547,7 @@ class BioyondReactionStation(BioyondWorkstation):
torque_variation: 是否观察(NO=1, YES=2)
temperature: 温度(C)
自动公式模板: (m二酐-x)*V二酐滴定/m二酐滴定
自动公式模板: 1000*(m二酐-x)*V二酐滴定/m二酐滴定
其中:
- m二酐滴定 = actualTargetWeigh (从extracted_actuals获取)
- V二酐滴定 = actualVolume (从extracted_actuals获取)
@@ -525,9 +637,9 @@ class BioyondReactionStation(BioyondWorkstation):
if m_anhydride_titration is None or v_anhydride_titration is None:
raise ValueError(f"实际加料量数据不完整: actualTargetWeigh={m_anhydride_titration}, actualVolume={v_anhydride_titration}")
# 3. 构建公式: (m二酐-x)*V二酐滴定/m二酐滴定
# 3. 构建公式: 1000*(m二酐-x)*V二酐滴定/m二酐滴定
# x_value 格式如 "{{1-2-3}}",保留完整格式(包括花括号)直接替换到公式中
volume_formula = f"({m_anhydride}-{x_value})*{v_anhydride_titration}/{m_anhydride_titration}"
volume_formula = f"1000*({m_anhydride}-{x_value})*{v_anhydride_titration}/{m_anhydride_titration}"
print(f"自动生成滴定公式: {volume_formula}")
print(f" m二酐={m_anhydride}, x={x_value}, V二酐滴定={v_anhydride_titration}, m二酐滴定={m_anhydride_titration}")
@@ -535,8 +647,8 @@ class BioyondReactionStation(BioyondWorkstation):
elif not volume_formula:
raise ValueError("必须提供 volume_formula 或 (x_value + feeding_order_data + extracted_actuals)")
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_titration"]["liquid"]
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_titration"]["observe"]
liquid_step_id = self.workflow_step_ids["liquid_feeding_titration"]["liquid"]
observe_step_id = self.workflow_step_ids["liquid_feeding_titration"]["observe"]
params = {
"param_values": {
@@ -558,7 +670,7 @@ class BioyondReactionStation(BioyondWorkstation):
}
self.pending_task_params.append(params)
print(f"成功添加液体进料滴定参数: volume={volume_formula}mL, material={assign_material_name}->ID:{material_id}")
print(f"成功添加液体进料滴定参数: volume={volume_formula}μL, material={assign_material_name}->ID:{material_id}")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
@@ -1009,8 +1121,8 @@ class BioyondReactionStation(BioyondWorkstation):
if isinstance(temperature, str):
temperature = float(temperature)
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_beaker"]["liquid"]
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_beaker"]["observe"]
liquid_step_id = self.workflow_step_ids["liquid_feeding_beaker"]["liquid"]
observe_step_id = self.workflow_step_ids["liquid_feeding_beaker"]["observe"]
params = {
"param_values": {
@@ -1032,7 +1144,7 @@ class BioyondReactionStation(BioyondWorkstation):
}
self.pending_task_params.append(params)
print(f"成功添加液体进料烧杯参数: volume={volume}mL, material={assign_material_name}->ID:{material_id}")
print(f"成功添加液体进料烧杯参数: volume={volume}μL, material={assign_material_name}->ID:{material_id}")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
@@ -1049,7 +1161,7 @@ class BioyondReactionStation(BioyondWorkstation):
Args:
assign_material_name: 物料名称(液体种类)
volume: 分液量(mL)
volume: 分液量(μL)
titration_type: 是否滴定(NO=1, YES=2)
time: 观察时间(分钟)
torque_variation: 是否观察(NO=1, YES=2)
@@ -1070,8 +1182,8 @@ class BioyondReactionStation(BioyondWorkstation):
if isinstance(temperature, str):
temperature = float(temperature)
liquid_step_id = WORKFLOW_STEP_IDS["drip_back"]["liquid"]
observe_step_id = WORKFLOW_STEP_IDS["drip_back"]["observe"]
liquid_step_id = self.workflow_step_ids["drip_back"]["liquid"]
observe_step_id = self.workflow_step_ids["drip_back"]["observe"]
params = {
"param_values": {
@@ -1093,7 +1205,7 @@ class BioyondReactionStation(BioyondWorkstation):
}
self.pending_task_params.append(params)
print(f"成功添加滴回去参数: material={assign_material_name}->ID:{material_id}, volume={volume}mL")
print(f"成功添加滴回去参数: material={assign_material_name}->ID:{material_id}, volume={volume}μL")
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})