添加从报告中提取实际加料量的功能,支持液体进料滴定的自动公式计算

This commit is contained in:
ZiWei
2025-11-15 13:30:22 +08:00
parent d266d21104
commit 7d097b8222
3 changed files with 419 additions and 132 deletions

View File

@@ -779,8 +779,38 @@ class BioyondDispensingStation(BioyondWorkstation):
self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg)
def _extract_actuals_from_report(self, report) -> Dict[str, Any]:
data = report.get('data') if isinstance(report, dict) else None
actual_target_weigh = None
actual_volume = None
if data:
extra = data.get('extraProperties') or {}
if isinstance(extra, dict):
for v in extra.values():
obj = None
try:
obj = json.loads(v) if isinstance(v, str) else v
except Exception:
obj = None
if isinstance(obj, dict):
tw = obj.get('targetWeigh')
vol = obj.get('volume')
if tw is not None:
try:
actual_target_weigh = float(tw)
except Exception:
pass
if vol is not None:
try:
actual_volume = float(vol)
except Exception:
pass
return {
'actualTargetWeigh': actual_target_weigh,
'actualVolume': actual_volume
}
# 等待多个任务完成并获取实验报告
def wait_for_multiple_orders_and_get_reports(self,
batch_create_result: str = None,
timeout: int = 7200,
@@ -902,6 +932,7 @@ class BioyondDispensingStation(BioyondWorkstation):
"status": "timeout",
"completion_status": None,
"report": None,
"extracted": None,
"elapsed_time": elapsed_time
})
@@ -940,6 +971,7 @@ class BioyondDispensingStation(BioyondWorkstation):
"status": "completed",
"completion_status": completion_info.get('status'),
"report": report,
"extracted": self._extract_actuals_from_report(report),
"elapsed_time": elapsed_time
})
@@ -959,6 +991,7 @@ class BioyondDispensingStation(BioyondWorkstation):
"status": "error",
"completion_status": completion_info.get('status'),
"report": None,
"extracted": None,
"error": str(e),
"elapsed_time": elapsed_time
})

View File

@@ -1,4 +1,5 @@
import json
import time
import requests
from typing import List, Dict, Any
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
@@ -291,22 +292,39 @@ class BioyondReactionStation(BioyondWorkstation):
def liquid_feeding_titration(
self,
volume_formula: str,
assign_material_name: str,
titration_type: str = "1",
volume_formula: str = None,
x_value: str = None,
feeding_order_data: str = None,
extracted_actuals: str = None,
titration_type: str = "2",
time: str = "90",
torque_variation: int = 2,
temperature: float = 25.00
):
"""液体进料(滴定)
支持两种模式:
1. 直接提供 volume_formula (传统方式)
2. 自动计算公式: 提供 x_value, feeding_order_data, extracted_actuals (新方式)
Args:
volume_formula: 分液公式(μL)
assign_material_name: 物料名称
titration_type: 是否滴定(1=否, 2=是)
volume_formula: 分液公式(μL),如果提供则直接使用,否则自动计算
x_value: 手工输入的x值,格式如 "1-2-3"
feeding_order_data: feeding_order JSON字符串或对象,用于获取m二酐值
extracted_actuals: 从报告提取的实际加料量JSON字符串,包含actualTargetWeigh和actualVolume
titration_type: 是否滴定(1=否, 2=是),默认2
time: 观察时间(分钟)
torque_variation: 是否观察(int类型, 1=否, 2=是)
temperature: 温度(°C)
自动公式模板: 1000*(m二酐-x)*V二酐滴定/m二酐滴定
其中:
- m二酐滴定 = actualTargetWeigh (从extracted_actuals获取)
- V二酐滴定 = actualVolume (从extracted_actuals获取)
- x = x_value (手工输入)
- m二酐 = feeding_order中type为"main_anhydride"的amount值
"""
self.append_to_workflow_sequence('{"web_workflow_name": "Liquid_feeding(titration)"}')
material_id = self.hardware_interface._get_material_id_by_name(assign_material_name)
@@ -316,6 +334,84 @@ class BioyondReactionStation(BioyondWorkstation):
if isinstance(temperature, str):
temperature = float(temperature)
# 如果没有直接提供volume_formula,则自动计算
if not volume_formula and x_value and feeding_order_data and extracted_actuals:
# 1. 解析 feeding_order_data 获取 m二酐
if isinstance(feeding_order_data, str):
try:
feeding_order_data = json.loads(feeding_order_data)
except json.JSONDecodeError as e:
raise ValueError(f"feeding_order_data JSON解析失败: {str(e)}")
# 支持两种格式:
# 格式1: 直接是数组 [{...}, {...}]
# 格式2: 对象包裹 {"feeding_order": [{...}, {...}]}
if isinstance(feeding_order_data, list):
feeding_order_list = feeding_order_data
elif isinstance(feeding_order_data, dict):
feeding_order_list = feeding_order_data.get("feeding_order", [])
else:
raise ValueError("feeding_order_data 必须是数组或包含feeding_order的字典")
# 从feeding_order中找到main_anhydride的amount
m_anhydride = None
for item in feeding_order_list:
if item.get("type") == "main_anhydride":
m_anhydride = item.get("amount")
break
if m_anhydride is None:
raise ValueError("在feeding_order中未找到type为'main_anhydride'的条目")
# 2. 解析 extracted_actuals 获取 actualTargetWeigh 和 actualVolume
if isinstance(extracted_actuals, str):
try:
extracted_actuals_obj = json.loads(extracted_actuals)
except json.JSONDecodeError as e:
raise ValueError(f"extracted_actuals JSON解析失败: {str(e)}")
else:
extracted_actuals_obj = extracted_actuals
# 获取actuals数组
actuals_list = extracted_actuals_obj.get("actuals", [])
if not actuals_list:
# actuals为空,无法自动生成公式,回退到手动模式
print(f"警告: extracted_actuals中actuals数组为空,无法自动生成公式,请手动提供volume_formula")
volume_formula = None # 清空,触发后续的错误检查
else:
# 根据assign_material_name匹配对应的actual数据
# 假设order_code中包含物料名称
matched_actual = None
for actual in actuals_list:
order_code = actual.get("order_code", "")
# 简单匹配:如果order_code包含物料名称
if assign_material_name in order_code:
matched_actual = actual
break
# 如果没有匹配到,使用第一个
if not matched_actual and actuals_list:
matched_actual = actuals_list[0]
if not matched_actual:
raise ValueError("无法从extracted_actuals中获取实际加料量数据")
m_anhydride_titration = matched_actual.get("actualTargetWeigh") # m二酐滴定
v_anhydride_titration = matched_actual.get("actualVolume") # V二酐滴定
if m_anhydride_titration is None or v_anhydride_titration is None:
raise ValueError(f"实际加料量数据不完整: actualTargetWeigh={m_anhydride_titration}, actualVolume={v_anhydride_titration}")
# 3. 构建公式: 1000*(m二酐-x)*V二酐滴定/m二酐滴定
# x_value 格式如 "{{1-2-3}}",保留完整格式(包括花括号)直接替换到公式中
volume_formula = f"1000*({m_anhydride}-{x_value})*{v_anhydride_titration}/{m_anhydride_titration}"
print(f"自动生成滴定公式: {volume_formula}")
print(f" m二酐={m_anhydride}, x={x_value}, V二酐滴定={v_anhydride_titration}, m二酐滴定={m_anhydride_titration}")
elif not volume_formula:
raise ValueError("必须提供 volume_formula 或 (x_value + feeding_order_data + extracted_actuals)")
liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_titration"]["liquid"]
observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_titration"]["observe"]
@@ -343,6 +439,182 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"当前队列长度: {len(self.pending_task_params)}")
return json.dumps({"suc": True})
def _extract_actuals_from_report(self, report) -> Dict[str, Any]:
data = report.get('data') if isinstance(report, dict) else None
actual_target_weigh = None
actual_volume = None
if data:
extra = data.get('extraProperties') or {}
if isinstance(extra, dict):
for v in extra.values():
obj = None
try:
obj = json.loads(v) if isinstance(v, str) else v
except Exception:
obj = None
if isinstance(obj, dict):
tw = obj.get('targetWeigh')
vol = obj.get('volume')
if tw is not None:
try:
actual_target_weigh = float(tw)
except Exception:
pass
if vol is not None:
try:
actual_volume = float(vol)
except Exception:
pass
return {
'actualTargetWeigh': actual_target_weigh,
'actualVolume': actual_volume
}
def extract_actuals_from_batch_reports(self, batch_reports_result: str) -> dict:
print(f"[DEBUG] extract_actuals 收到原始数据: {batch_reports_result[:500]}...") # 打印前500字符
try:
obj = json.loads(batch_reports_result) if isinstance(batch_reports_result, str) else batch_reports_result
if isinstance(obj, dict) and "return_info" in obj:
inner = obj["return_info"]
obj = json.loads(inner) if isinstance(inner, str) else inner
reports = obj.get("reports", []) if isinstance(obj, dict) else []
print(f"[DEBUG] 解析后的 reports 数组长度: {len(reports)}")
except Exception as e:
print(f"[DEBUG] 解析异常: {e}")
reports = []
actuals = []
for i, r in enumerate(reports):
print(f"[DEBUG] 处理 report[{i}]: order_code={r.get('order_code')}, has_extracted={r.get('extracted') is not None}, has_report={r.get('report') is not None}")
order_code = r.get("order_code")
order_id = r.get("order_id")
ex = r.get("extracted")
if isinstance(ex, dict) and (ex.get("actualTargetWeigh") is not None or ex.get("actualVolume") is not None):
print(f"[DEBUG] 从 extracted 字段提取: actualTargetWeigh={ex.get('actualTargetWeigh')}, actualVolume={ex.get('actualVolume')}")
actuals.append({
"order_code": order_code,
"order_id": order_id,
"actualTargetWeigh": ex.get("actualTargetWeigh"),
"actualVolume": ex.get("actualVolume")
})
continue
report = r.get("report")
vals = self._extract_actuals_from_report(report) if report else {"actualTargetWeigh": None, "actualVolume": None}
print(f"[DEBUG] 从 report 字段提取: {vals}")
actuals.append({
"order_code": order_code,
"order_id": order_id,
**vals
})
print(f"[DEBUG] 最终提取的 actuals 数组长度: {len(actuals)}")
result = {
"return_info": json.dumps({"actuals": actuals}, ensure_ascii=False)
}
print(f"[DEBUG] 返回结果: {result}")
return result
def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, check_interval: int = 10) -> Dict[str, Any]:
try:
timeout = int(timeout) if timeout else 7200
check_interval = int(check_interval) if check_interval else 10
if not batch_create_result or batch_create_result == "":
raise ValueError("batch_create_result为空")
try:
if isinstance(batch_create_result, str) and '[...]' in batch_create_result:
batch_create_result = batch_create_result.replace('[...]', '[]')
result_obj = json.loads(batch_create_result) if isinstance(batch_create_result, str) else batch_create_result
if isinstance(result_obj, dict) and "return_value" in result_obj:
inner = result_obj.get("return_value")
if isinstance(inner, str):
result_obj = json.loads(inner)
elif isinstance(inner, dict):
result_obj = inner
order_codes = result_obj.get("order_codes", [])
order_ids = result_obj.get("order_ids", [])
except Exception as e:
raise ValueError(f"解析batch_create_result失败: {e}")
if not order_codes or not order_ids:
raise ValueError("缺少order_codes或order_ids")
if not isinstance(order_codes, list):
order_codes = [order_codes]
if not isinstance(order_ids, list):
order_ids = [order_ids]
if len(order_codes) != len(order_ids):
raise ValueError("order_codes与order_ids数量不匹配")
total = len(order_codes)
pending = {c: {"order_id": order_ids[i], "completed": False} for i, c in enumerate(order_codes)}
reports = []
start_time = time.time()
while pending:
elapsed_time = time.time() - start_time
if elapsed_time > timeout:
for oc in list(pending.keys()):
reports.append({
"order_code": oc,
"order_id": pending[oc]["order_id"],
"status": "timeout",
"completion_status": None,
"report": None,
"extracted": None,
"elapsed_time": elapsed_time
})
break
completed_round = []
for oc in list(pending.keys()):
oid = pending[oc]["order_id"]
if oc in self.order_completion_status:
info = self.order_completion_status[oc]
try:
rq = json.dumps({"order_id": oid})
rep = self.hardware_interface.order_report(rq)
if not rep:
rep = {"error": "无法获取报告"}
reports.append({
"order_code": oc,
"order_id": oid,
"status": "completed",
"completion_status": info.get('status'),
"report": rep,
"extracted": self._extract_actuals_from_report(rep),
"elapsed_time": elapsed_time
})
completed_round.append(oc)
del self.order_completion_status[oc]
except Exception as e:
reports.append({
"order_code": oc,
"order_id": oid,
"status": "error",
"completion_status": info.get('status') if 'info' in locals() else None,
"report": None,
"extracted": None,
"error": str(e),
"elapsed_time": elapsed_time
})
completed_round.append(oc)
for oc in completed_round:
del pending[oc]
if pending:
time.sleep(check_interval)
completed_count = sum(1 for r in reports if r['status'] == 'completed')
timeout_count = sum(1 for r in reports if r['status'] == 'timeout')
error_count = sum(1 for r in reports if r['status'] == 'error')
final_elapsed_time = time.time() - start_time
summary = {
"total": total,
"completed": completed_count,
"timeout": timeout_count,
"error": error_count,
"elapsed_time": round(final_elapsed_time, 2),
"reports": reports
}
return {
"return_info": json.dumps(summary, ensure_ascii=False)
}
except Exception as e:
raise
def liquid_feeding_beaker(
self,
volume: str = "350",
@@ -580,10 +852,10 @@ class BioyondReactionStation(BioyondWorkstation):
# print(f"\n✅ 任务创建成功: {result}")
# print(f"\n✅ 任务创建成功")
print(f"{'='*60}\n")
# 返回结果,包含合并后的工作流数据和订单参数
return json.dumps({
"success": True,
"success": True,
"result": result,
"merged_workflow": merged_workflow,
"order_params": order_params