diff --git a/unilabos/devices/workstation/bioyond_studio/dispensing_station.py b/unilabos/devices/workstation/bioyond_studio/dispensing_station.py index 561626c4..df24cd6e 100644 --- a/unilabos/devices/workstation/bioyond_studio/dispensing_station.py +++ b/unilabos/devices/workstation/bioyond_studio/dispensing_station.py @@ -779,8 +779,38 @@ class BioyondDispensingStation(BioyondWorkstation): self.hardware_interface._logger.error(error_msg) raise BioyondException(error_msg) + def _extract_actuals_from_report(self, report) -> Dict[str, Any]: + data = report.get('data') if isinstance(report, dict) else None + actual_target_weigh = None + actual_volume = None + if data: + extra = data.get('extraProperties') or {} + if isinstance(extra, dict): + for v in extra.values(): + obj = None + try: + obj = json.loads(v) if isinstance(v, str) else v + except Exception: + obj = None + if isinstance(obj, dict): + tw = obj.get('targetWeigh') + vol = obj.get('volume') + if tw is not None: + try: + actual_target_weigh = float(tw) + except Exception: + pass + if vol is not None: + try: + actual_volume = float(vol) + except Exception: + pass + return { + 'actualTargetWeigh': actual_target_weigh, + 'actualVolume': actual_volume + } - + # 等待多个任务完成并获取实验报告 def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, @@ -902,6 +932,7 @@ class BioyondDispensingStation(BioyondWorkstation): "status": "timeout", "completion_status": None, "report": None, + "extracted": None, "elapsed_time": elapsed_time }) @@ -940,6 +971,7 @@ class BioyondDispensingStation(BioyondWorkstation): "status": "completed", "completion_status": completion_info.get('status'), "report": report, + "extracted": self._extract_actuals_from_report(report), "elapsed_time": elapsed_time }) @@ -959,6 +991,7 @@ class BioyondDispensingStation(BioyondWorkstation): "status": "error", "completion_status": completion_info.get('status'), "report": None, + "extracted": None, "error": str(e), "elapsed_time": elapsed_time }) diff --git a/unilabos/devices/workstation/bioyond_studio/reaction_station.py b/unilabos/devices/workstation/bioyond_studio/reaction_station.py index 071d99b5..10d0c089 100644 --- a/unilabos/devices/workstation/bioyond_studio/reaction_station.py +++ b/unilabos/devices/workstation/bioyond_studio/reaction_station.py @@ -1,4 +1,5 @@ import json +import time import requests from typing import List, Dict, Any from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation @@ -291,22 +292,39 @@ class BioyondReactionStation(BioyondWorkstation): def liquid_feeding_titration( self, - volume_formula: str, assign_material_name: str, - titration_type: str = "1", + volume_formula: str = None, + x_value: str = None, + feeding_order_data: str = None, + extracted_actuals: str = None, + titration_type: str = "2", time: str = "90", torque_variation: int = 2, temperature: float = 25.00 ): """液体进料(滴定) + 支持两种模式: + 1. 直接提供 volume_formula (传统方式) + 2. 自动计算公式: 提供 x_value, feeding_order_data, extracted_actuals (新方式) + Args: - volume_formula: 分液公式(μL) assign_material_name: 物料名称 - titration_type: 是否滴定(1=否, 2=是) + volume_formula: 分液公式(μL),如果提供则直接使用,否则自动计算 + x_value: 手工输入的x值,格式如 "1-2-3" + feeding_order_data: feeding_order JSON字符串或对象,用于获取m二酐值 + extracted_actuals: 从报告提取的实际加料量JSON字符串,包含actualTargetWeigh和actualVolume + titration_type: 是否滴定(1=否, 2=是),默认2 time: 观察时间(分钟) torque_variation: 是否观察(int类型, 1=否, 2=是) temperature: 温度(°C) + + 自动公式模板: 1000*(m二酐-x)*V二酐滴定/m二酐滴定 + 其中: + - m二酐滴定 = actualTargetWeigh (从extracted_actuals获取) + - V二酐滴定 = actualVolume (从extracted_actuals获取) + - x = x_value (手工输入) + - m二酐 = feeding_order中type为"main_anhydride"的amount值 """ self.append_to_workflow_sequence('{"web_workflow_name": "Liquid_feeding(titration)"}') material_id = self.hardware_interface._get_material_id_by_name(assign_material_name) @@ -316,6 +334,84 @@ class BioyondReactionStation(BioyondWorkstation): if isinstance(temperature, str): temperature = float(temperature) + # 如果没有直接提供volume_formula,则自动计算 + if not volume_formula and x_value and feeding_order_data and extracted_actuals: + # 1. 解析 feeding_order_data 获取 m二酐 + if isinstance(feeding_order_data, str): + try: + feeding_order_data = json.loads(feeding_order_data) + except json.JSONDecodeError as e: + raise ValueError(f"feeding_order_data JSON解析失败: {str(e)}") + + # 支持两种格式: + # 格式1: 直接是数组 [{...}, {...}] + # 格式2: 对象包裹 {"feeding_order": [{...}, {...}]} + if isinstance(feeding_order_data, list): + feeding_order_list = feeding_order_data + elif isinstance(feeding_order_data, dict): + feeding_order_list = feeding_order_data.get("feeding_order", []) + else: + raise ValueError("feeding_order_data 必须是数组或包含feeding_order的字典") + + # 从feeding_order中找到main_anhydride的amount + m_anhydride = None + for item in feeding_order_list: + if item.get("type") == "main_anhydride": + m_anhydride = item.get("amount") + break + + if m_anhydride is None: + raise ValueError("在feeding_order中未找到type为'main_anhydride'的条目") + + # 2. 解析 extracted_actuals 获取 actualTargetWeigh 和 actualVolume + if isinstance(extracted_actuals, str): + try: + extracted_actuals_obj = json.loads(extracted_actuals) + except json.JSONDecodeError as e: + raise ValueError(f"extracted_actuals JSON解析失败: {str(e)}") + else: + extracted_actuals_obj = extracted_actuals + + # 获取actuals数组 + actuals_list = extracted_actuals_obj.get("actuals", []) + if not actuals_list: + # actuals为空,无法自动生成公式,回退到手动模式 + print(f"警告: extracted_actuals中actuals数组为空,无法自动生成公式,请手动提供volume_formula") + volume_formula = None # 清空,触发后续的错误检查 + else: + # 根据assign_material_name匹配对应的actual数据 + # 假设order_code中包含物料名称 + matched_actual = None + for actual in actuals_list: + order_code = actual.get("order_code", "") + # 简单匹配:如果order_code包含物料名称 + if assign_material_name in order_code: + matched_actual = actual + break + + # 如果没有匹配到,使用第一个 + if not matched_actual and actuals_list: + matched_actual = actuals_list[0] + + if not matched_actual: + raise ValueError("无法从extracted_actuals中获取实际加料量数据") + + m_anhydride_titration = matched_actual.get("actualTargetWeigh") # m二酐滴定 + v_anhydride_titration = matched_actual.get("actualVolume") # V二酐滴定 + + if m_anhydride_titration is None or v_anhydride_titration is None: + raise ValueError(f"实际加料量数据不完整: actualTargetWeigh={m_anhydride_titration}, actualVolume={v_anhydride_titration}") + + # 3. 构建公式: 1000*(m二酐-x)*V二酐滴定/m二酐滴定 + # x_value 格式如 "{{1-2-3}}",保留完整格式(包括花括号)直接替换到公式中 + volume_formula = f"1000*({m_anhydride}-{x_value})*{v_anhydride_titration}/{m_anhydride_titration}" + + print(f"自动生成滴定公式: {volume_formula}") + print(f" m二酐={m_anhydride}, x={x_value}, V二酐滴定={v_anhydride_titration}, m二酐滴定={m_anhydride_titration}") + + elif not volume_formula: + raise ValueError("必须提供 volume_formula 或 (x_value + feeding_order_data + extracted_actuals)") + liquid_step_id = WORKFLOW_STEP_IDS["liquid_feeding_titration"]["liquid"] observe_step_id = WORKFLOW_STEP_IDS["liquid_feeding_titration"]["observe"] @@ -343,6 +439,182 @@ class BioyondReactionStation(BioyondWorkstation): print(f"当前队列长度: {len(self.pending_task_params)}") return json.dumps({"suc": True}) + def _extract_actuals_from_report(self, report) -> Dict[str, Any]: + data = report.get('data') if isinstance(report, dict) else None + actual_target_weigh = None + actual_volume = None + if data: + extra = data.get('extraProperties') or {} + if isinstance(extra, dict): + for v in extra.values(): + obj = None + try: + obj = json.loads(v) if isinstance(v, str) else v + except Exception: + obj = None + if isinstance(obj, dict): + tw = obj.get('targetWeigh') + vol = obj.get('volume') + if tw is not None: + try: + actual_target_weigh = float(tw) + except Exception: + pass + if vol is not None: + try: + actual_volume = float(vol) + except Exception: + pass + return { + 'actualTargetWeigh': actual_target_weigh, + 'actualVolume': actual_volume + } + + def extract_actuals_from_batch_reports(self, batch_reports_result: str) -> dict: + print(f"[DEBUG] extract_actuals 收到原始数据: {batch_reports_result[:500]}...") # 打印前500字符 + try: + obj = json.loads(batch_reports_result) if isinstance(batch_reports_result, str) else batch_reports_result + if isinstance(obj, dict) and "return_info" in obj: + inner = obj["return_info"] + obj = json.loads(inner) if isinstance(inner, str) else inner + reports = obj.get("reports", []) if isinstance(obj, dict) else [] + print(f"[DEBUG] 解析后的 reports 数组长度: {len(reports)}") + except Exception as e: + print(f"[DEBUG] 解析异常: {e}") + reports = [] + + actuals = [] + for i, r in enumerate(reports): + print(f"[DEBUG] 处理 report[{i}]: order_code={r.get('order_code')}, has_extracted={r.get('extracted') is not None}, has_report={r.get('report') is not None}") + order_code = r.get("order_code") + order_id = r.get("order_id") + ex = r.get("extracted") + if isinstance(ex, dict) and (ex.get("actualTargetWeigh") is not None or ex.get("actualVolume") is not None): + print(f"[DEBUG] 从 extracted 字段提取: actualTargetWeigh={ex.get('actualTargetWeigh')}, actualVolume={ex.get('actualVolume')}") + actuals.append({ + "order_code": order_code, + "order_id": order_id, + "actualTargetWeigh": ex.get("actualTargetWeigh"), + "actualVolume": ex.get("actualVolume") + }) + continue + report = r.get("report") + vals = self._extract_actuals_from_report(report) if report else {"actualTargetWeigh": None, "actualVolume": None} + print(f"[DEBUG] 从 report 字段提取: {vals}") + actuals.append({ + "order_code": order_code, + "order_id": order_id, + **vals + }) + + print(f"[DEBUG] 最终提取的 actuals 数组长度: {len(actuals)}") + result = { + "return_info": json.dumps({"actuals": actuals}, ensure_ascii=False) + } + print(f"[DEBUG] 返回结果: {result}") + return result + + def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, check_interval: int = 10) -> Dict[str, Any]: + try: + timeout = int(timeout) if timeout else 7200 + check_interval = int(check_interval) if check_interval else 10 + if not batch_create_result or batch_create_result == "": + raise ValueError("batch_create_result为空") + try: + if isinstance(batch_create_result, str) and '[...]' in batch_create_result: + batch_create_result = batch_create_result.replace('[...]', '[]') + result_obj = json.loads(batch_create_result) if isinstance(batch_create_result, str) else batch_create_result + if isinstance(result_obj, dict) and "return_value" in result_obj: + inner = result_obj.get("return_value") + if isinstance(inner, str): + result_obj = json.loads(inner) + elif isinstance(inner, dict): + result_obj = inner + order_codes = result_obj.get("order_codes", []) + order_ids = result_obj.get("order_ids", []) + except Exception as e: + raise ValueError(f"解析batch_create_result失败: {e}") + if not order_codes or not order_ids: + raise ValueError("缺少order_codes或order_ids") + if not isinstance(order_codes, list): + order_codes = [order_codes] + if not isinstance(order_ids, list): + order_ids = [order_ids] + if len(order_codes) != len(order_ids): + raise ValueError("order_codes与order_ids数量不匹配") + total = len(order_codes) + pending = {c: {"order_id": order_ids[i], "completed": False} for i, c in enumerate(order_codes)} + reports = [] + start_time = time.time() + while pending: + elapsed_time = time.time() - start_time + if elapsed_time > timeout: + for oc in list(pending.keys()): + reports.append({ + "order_code": oc, + "order_id": pending[oc]["order_id"], + "status": "timeout", + "completion_status": None, + "report": None, + "extracted": None, + "elapsed_time": elapsed_time + }) + break + completed_round = [] + for oc in list(pending.keys()): + oid = pending[oc]["order_id"] + if oc in self.order_completion_status: + info = self.order_completion_status[oc] + try: + rq = json.dumps({"order_id": oid}) + rep = self.hardware_interface.order_report(rq) + if not rep: + rep = {"error": "无法获取报告"} + reports.append({ + "order_code": oc, + "order_id": oid, + "status": "completed", + "completion_status": info.get('status'), + "report": rep, + "extracted": self._extract_actuals_from_report(rep), + "elapsed_time": elapsed_time + }) + completed_round.append(oc) + del self.order_completion_status[oc] + except Exception as e: + reports.append({ + "order_code": oc, + "order_id": oid, + "status": "error", + "completion_status": info.get('status') if 'info' in locals() else None, + "report": None, + "extracted": None, + "error": str(e), + "elapsed_time": elapsed_time + }) + completed_round.append(oc) + for oc in completed_round: + del pending[oc] + if pending: + time.sleep(check_interval) + completed_count = sum(1 for r in reports if r['status'] == 'completed') + timeout_count = sum(1 for r in reports if r['status'] == 'timeout') + error_count = sum(1 for r in reports if r['status'] == 'error') + final_elapsed_time = time.time() - start_time + summary = { + "total": total, + "completed": completed_count, + "timeout": timeout_count, + "error": error_count, + "elapsed_time": round(final_elapsed_time, 2), + "reports": reports + } + return { + "return_info": json.dumps(summary, ensure_ascii=False) + } + except Exception as e: + raise + def liquid_feeding_beaker( self, volume: str = "350", @@ -580,10 +852,10 @@ class BioyondReactionStation(BioyondWorkstation): # print(f"\n✅ 任务创建成功: {result}") # print(f"\n✅ 任务创建成功") print(f"{'='*60}\n") - + # 返回结果,包含合并后的工作流数据和订单参数 return json.dumps({ - "success": True, + "success": True, "result": result, "merged_workflow": merged_workflow, "order_params": order_params diff --git a/unilabos/registry/devices/reaction_station_bioyond.yaml b/unilabos/registry/devices/reaction_station_bioyond.yaml index 84918b3d..32627319 100644 --- a/unilabos/registry/devices/reaction_station_bioyond.yaml +++ b/unilabos/registry/devices/reaction_station_bioyond.yaml @@ -4,106 +4,6 @@ reaction_station.bioyond: - reaction_station_bioyond class: action_value_mappings: - auto-create_order: - feedback: {} - goal: {} - goal_default: - json_str: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: - json_str: - type: string - required: - - json_str - type: object - result: {} - required: - - goal - title: create_order参数 - type: object - type: UniLabJsonCommand - auto-merge_workflow_with_parameters: - feedback: {} - goal: {} - goal_default: - json_str: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: - json_str: - type: string - required: - - json_str - type: object - result: {} - required: - - goal - title: merge_workflow_with_parameters参数 - type: object - type: UniLabJsonCommand - auto-process_web_workflows: - feedback: {} - goal: {} - goal_default: - web_workflow_json: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: - web_workflow_json: - type: string - required: - - web_workflow_json - type: object - result: {} - required: - - goal - title: process_web_workflows参数 - type: object - type: UniLabJsonCommand - auto-workflow_step_query: - feedback: {} - goal: {} - goal_default: - workflow_id: null - handles: {} - placeholder_keys: {} - result: {} - schema: - description: '' - properties: - feedback: {} - goal: - properties: - workflow_id: - type: string - required: - - workflow_id - type: object - result: {} - required: - - goal - title: workflow_step_query参数 - type: object - type: UniLabJsonCommand drip_back: feedback: {} goal: @@ -160,6 +60,56 @@ reaction_station.bioyond: title: drip_back参数 type: object type: UniLabJsonCommand + extract_actuals_from_batch_reports: + feedback: {} + goal: + batch_reports_result: batch_reports_result + goal_default: + batch_reports_result: '' + handles: + input: + - data_key: batch_reports_result + data_source: handle + data_type: string + handler_key: BATCH_REPORTS_RESULT + io_type: source + label: Batch Order Completion Reports + output: + - data_key: return_info + data_source: executor + data_type: string + handler_key: ACTUALS_EXTRACTED + io_type: sink + label: Extracted Actuals + result: + return_info: return_info + schema: + description: 从批量任务完成报告中提取每个订单的实际加料量,输出extracted列表。 + properties: + feedback: {} + goal: + properties: + batch_reports_result: + description: 批量任务完成信息JSON字符串或对象,包含reports数组 + type: string + required: + - batch_reports_result + type: object + result: + properties: + return_info: + description: JSON字符串,包含actuals数组,每项含order_code, order_id, actualTargetWeigh, + actualVolume + type: string + required: + - return_info + title: extract_actuals_from_batch_reports结果 + type: object + required: + - goal + title: extract_actuals_from_batch_reports参数 + type: object + type: UniLabJsonCommand liquid_feeding_beaker: feedback: {} goal: @@ -287,22 +237,41 @@ reaction_station.bioyond: feedback: {} goal: assign_material_name: assign_material_name + extracted_actuals: extracted_actuals + feeding_order_data: feeding_order_data temperature: temperature time: time titration_type: titration_type torque_variation: torque_variation volume_formula: volume_formula + x_value: x_value goal_default: assign_material_name: '' - temperature: '' - time: '' - titration_type: '' - torque_variation: '' + extracted_actuals: '' + feeding_order_data: '' + temperature: '25.00' + time: '90' + titration_type: '2' + torque_variation: '2' volume_formula: '' - handles: {} + x_value: '' + handles: + input: + - data_key: extracted_actuals + data_source: handle + data_type: string + handler_key: ACTUALS_EXTRACTED + io_type: source + label: Extracted Actuals From Reports + - data_key: feeding_order_data + data_source: handle + data_type: object + handler_key: feeding_order + io_type: source + label: Feeding Order Data From Calculation Node result: {} schema: - description: 液体进料(滴定) + description: 液体进料(滴定)。支持两种模式:1)直接提供volume_formula;2)自动计算-提供x_value+feeding_order_data+extracted_actuals,系统自动生成公式"1000*(m二酐-x)*V二酐滴定/m二酐滴定" properties: feedback: {} goal: @@ -310,28 +279,37 @@ reaction_station.bioyond: assign_material_name: description: 物料名称 type: string + extracted_actuals: + description: 从报告提取的实际加料量JSON字符串,包含actualTargetWeigh(m二酐滴定)和actualVolume(V二酐滴定) + type: string + feeding_order_data: + description: 'feeding_order JSON对象,用于获取m二酐值(type为main_anhydride的amount)。示例: + {"feeding_order": [{"type": "main_anhydride", "amount": 1.915}]}' + type: string temperature: - description: 温度设定(°C) + default: '25.00' + description: 温度设定(°C),默认25.00 type: string time: - description: 观察时间(分钟) + default: '90' + description: 观察时间(分钟),默认90 type: string titration_type: - description: 是否滴定(1=否, 2=是) + default: '2' + description: 是否滴定(1=否, 2=是),默认2 type: string torque_variation: - description: 是否观察 (1=否, 2=是) + default: '2' + description: 是否观察 (1=否, 2=是),默认2 type: string volume_formula: - description: 分液公式(μL) + description: 分液公式(μL)。可直接提供固定公式,或留空由系统根据x_value、feeding_order_data、extracted_actuals自动生成 + type: string + x_value: + description: 公式中的x值,手工输入,格式为"{{1-2-3}}"(包含双花括号)。用于自动公式计算 type: string required: - - volume_formula - assign_material_name - - time - - torque_variation - - titration_type - - temperature type: object result: {} required: @@ -546,7 +524,9 @@ reaction_station.bioyond: module: unilabos.devices.workstation.bioyond_studio.reaction_station:BioyondReactionStation protocol_type: [] status_types: - workflow_sequence: String + all_workflows: dict + bioyond_status: dict + workstation_status: dict type: python config_info: [] description: Bioyond反应站 @@ -558,18 +538,20 @@ reaction_station.bioyond: config: type: object deck: - type: string - protocol_type: - type: string + type: object required: [] type: object data: properties: - workflow_sequence: - items: - type: string - type: array + all_workflows: + type: object + bioyond_status: + type: object + workstation_status: + type: object required: - - workflow_sequence + - bioyond_status + - all_workflows + - workstation_status type: object version: 1.0.0