diff --git a/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py b/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py index 5c7f9afe..afd515aa 100644 --- a/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py +++ b/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py @@ -192,6 +192,23 @@ class BioyondV1RPC(BaseRequest): return [] return str(response.get("data", {})) + def material_type_list(self) -> list: + """查询物料类型列表 + + 返回值: + list: 物料类型数组,失败返回空列表 + """ + response = self.post( + url=f'{self.host}/api/lims/storage/material-type-list', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": {}, + }) + if not response or response['code'] != 1: + return [] + return response.get("data", []) + def material_inbound(self, material_id: str, location_id: str) -> dict: """ 描述:指定库位入库一个物料 @@ -221,6 +238,26 @@ class BioyondV1RPC(BaseRequest): # 入库成功时,即使没有 data 字段,也返回成功标识 return response.get("data") or {"success": True} + def batch_inbound(self, inbound_items: List[Dict[str, Any]]) -> int: + """批量入库物料 + + 参数: + inbound_items: 入库条目列表,每项包含 materialId/locationId/quantity 等 + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/storage/batch-inbound', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": inbound_items, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + def delete_material(self, material_id: str) -> dict: """ 描述:删除尚未入库的物料 @@ -289,6 +326,66 @@ class BioyondV1RPC(BaseRequest): return None return response + def batch_outbound(self, outbound_items: List[Dict[str, Any]]) -> int: + """批量出库物料 + + 参数: + outbound_items: 出库条目列表,每项包含 materialId/locationId/quantity 等 + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/storage/batch-outbound', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": outbound_items, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + + def material_info(self, material_id: str) -> dict: + """查询物料详情 + + 参数: + material_id: 物料ID + + 返回值: + dict: 物料信息字典,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/storage/material-info', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": material_id, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + + def reset_location(self, location_id: str) -> int: + """复位库位 + + 参数: + location_id: 库位ID + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/storage/reset-location', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": location_id, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + # ==================== 工作流查询相关接口 ==================== def query_workflow(self, json_str: str) -> dict: @@ -332,6 +429,66 @@ class BioyondV1RPC(BaseRequest): return {} return response.get("data", {}) + def split_workflow_list(self, params: Dict[str, Any]) -> dict: + """查询可拆分工作流列表 + + 参数: + params: 查询条件参数 + + 返回值: + dict: 返回数据字典,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/workflow/split-workflow-list', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": params, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + + def merge_workflow(self, data: Dict[str, Any]) -> dict: + """合并工作流(无参数版) + + 参数: + data: 合并请求体,包含待合并的子工作流信息 + + 返回值: + dict: 合并结果,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/workflow/merge-workflow', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": data, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + + def merge_workflow_with_parameters(self, data: Dict[str, Any]) -> dict: + """合并工作流(携带参数) + + 参数: + data: 合并请求体,包含 name、workflows 以及 stepParameters 等 + + 返回值: + dict: 合并结果,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/workflow/merge-workflow-with-parameters', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": data, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + def validate_workflow_parameters(self, workflows: List[Dict[str, Any]]) -> Dict[str, Any]: """验证工作流参数格式""" try: @@ -494,35 +651,34 @@ class BioyondV1RPC(BaseRequest): return {} return response.get("data", {}) - def order_report(self, json_str: str) -> dict: - """ - 描述:查询某个任务明细 - json_str 格式为JSON字符串: - '{"order_id": "order123"}' - """ - try: - data = json.loads(json_str) - order_id = data.get("order_id", "") - except json.JSONDecodeError: - return {} + def order_report(self, order_id: str) -> dict: + """查询订单报告 + 参数: + order_id: 订单ID + + 返回值: + dict: 报告数据,失败返回空字典 + """ response = self.post( - url=f'{self.host}/api/lims/order/project-order-report', + url=f'{self.host}/api/lims/order/order-report', params={ "apiKey": self.api_key, "requestTime": self.get_current_time_iso8601(), "data": order_id, }) - if not response or response['code'] != 1: return {} return response.get("data", {}) def order_takeout(self, json_str: str) -> int: - """ - 描述:取出任务产物 - json_str 格式为JSON字符串: - '{"order_id": "order123", "preintake_id": "preintake123"}' + """取出任务产物 + + 参数: + json_str: JSON字符串,包含 order_id 与 preintake_id + + 返回值: + int: 成功返回1,失败返回0 """ try: data = json.loads(json_str) @@ -545,14 +701,15 @@ class BioyondV1RPC(BaseRequest): return 0 return response.get("code", 0) + def sample_waste_removal(self, order_id: str) -> dict: - """ - 样品/废料取出接口 + """样品/废料取出 参数: - - order_id: 订单ID + order_id: 订单ID - 返回: 取出结果 + 返回值: + dict: 取出结果,失败返回空字典 """ params = {"orderId": order_id} @@ -574,10 +731,13 @@ class BioyondV1RPC(BaseRequest): return response.get("data", {}) def cancel_order(self, json_str: str) -> bool: - """ - 描述:取消指定任务 - json_str 格式为JSON字符串: - '{"order_id": "order123"}' + """取消指定任务 + + 参数: + json_str: JSON字符串,包含 order_id + + 返回值: + bool: 成功返回 True,失败返回 False """ try: data = json.loads(json_str) @@ -597,6 +757,126 @@ class BioyondV1RPC(BaseRequest): return False return True + def cancel_experiment(self, order_id: str) -> int: + """取消指定实验 + + 参数: + order_id: 订单ID + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/order/cancel-experiment', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": order_id, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + + def batch_cancel_experiment(self, order_ids: List[str]) -> int: + """批量取消实验 + + 参数: + order_ids: 订单ID列表 + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/order/batch-cancel-experiment', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": order_ids, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + + def gantts_by_order_id(self, order_id: str) -> dict: + """查询订单甘特图数据 + + 参数: + order_id: 订单ID + + 返回值: + dict: 甘特数据,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/order/gantts-by-order-id', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": order_id, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + + def simulation_gantt_by_order_id(self, order_id: str) -> dict: + """查询订单模拟甘特图数据 + + 参数: + order_id: 订单ID + + 返回值: + dict: 模拟甘特数据,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/order/simulation-gantt-by-order-id', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": order_id, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + + def reset_order_status(self, order_id: str) -> int: + """复位订单状态 + + 参数: + order_id: 订单ID + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/order/reset-order-status', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": order_id, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + + def gantt_with_simulation_by_order_id(self, order_id: str) -> dict: + """查询订单甘特与模拟联合数据 + + 参数: + order_id: 订单ID + + 返回值: + dict: 联合数据,失败返回空字典 + """ + response = self.post( + url=f'{self.host}/api/lims/order/gantt-with-simulation-by-order-id', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": order_id, + }) + if not response or response['code'] != 1: + return {} + return response.get("data", {}) + # ==================== 设备管理相关接口 ==================== def device_list(self, json_str: str = "") -> list: @@ -628,9 +908,13 @@ class BioyondV1RPC(BaseRequest): return response.get("data", []) def device_operation(self, json_str: str) -> int: - """ - 描述:操作设备 - json_str 格式为JSON字符串 + """设备操作 + + 参数: + json_str: JSON字符串,包含 device_no/operationType/operationParams + + 返回值: + int: 成功返回1,失败返回0 """ try: data = json.loads(json_str) @@ -643,7 +927,7 @@ class BioyondV1RPC(BaseRequest): return 0 response = self.post( - url=f'{self.host}/api/lims/device/device-operation', + url=f'{self.host}/api/lims/device/execute-operation', params={ "apiKey": self.api_key, "requestTime": self.get_current_time_iso8601(), @@ -654,9 +938,30 @@ class BioyondV1RPC(BaseRequest): return 0 return response.get("code", 0) + def reset_devices(self) -> int: + """复位设备集合 + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/device/reset-devices', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + # ==================== 调度器相关接口 ==================== def scheduler_status(self) -> dict: + """查询调度器状态 + + 返回值: + dict: 包含 schedulerStatus/hasTask/creationTime 等 + """ response = self.post( url=f'{self.host}/api/lims/scheduler/scheduler-status', params={ @@ -669,7 +974,7 @@ class BioyondV1RPC(BaseRequest): return response.get("data", {}) def scheduler_start(self) -> int: - """描述:启动调度器""" + """启动调度器""" response = self.post( url=f'{self.host}/api/lims/scheduler/start', params={ @@ -682,7 +987,7 @@ class BioyondV1RPC(BaseRequest): return response.get("code", 0) def scheduler_pause(self) -> int: - """描述:暂停调度器""" + """暂停调度器""" response = self.post( url=f'{self.host}/api/lims/scheduler/pause', params={ @@ -694,8 +999,21 @@ class BioyondV1RPC(BaseRequest): return 0 return response.get("code", 0) + def scheduler_smart_pause(self) -> int: + """智能暂停调度器""" + response = self.post( + url=f'{self.host}/api/lims/scheduler/smart-pause', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + }) + + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + def scheduler_continue(self) -> int: - """描述:继续调度器""" + """继续调度器""" response = self.post( url=f'{self.host}/api/lims/scheduler/continue', params={ @@ -708,7 +1026,7 @@ class BioyondV1RPC(BaseRequest): return response.get("code", 0) def scheduler_stop(self) -> int: - """描述:停止调度器""" + """停止调度器""" response = self.post( url=f'{self.host}/api/lims/scheduler/stop', params={ @@ -721,7 +1039,7 @@ class BioyondV1RPC(BaseRequest): return response.get("code", 0) def scheduler_reset(self) -> int: - """描述:复位调度器""" + """复位调度器""" response = self.post( url=f'{self.host}/api/lims/scheduler/reset', params={ @@ -733,6 +1051,26 @@ class BioyondV1RPC(BaseRequest): return 0 return response.get("code", 0) + def scheduler_reply_error_handling(self, data: Dict[str, Any]) -> int: + """调度错误处理回复 + + 参数: + data: 错误处理参数 + + 返回值: + int: 成功返回1,失败返回0 + """ + response = self.post( + url=f'{self.host}/api/lims/scheduler/reply-error-handling', + params={ + "apiKey": self.api_key, + "requestTime": self.get_current_time_iso8601(), + "data": data, + }) + if not response or response['code'] != 1: + return 0 + return response.get("code", 0) + # ==================== 辅助方法 ==================== def _load_material_cache(self): @@ -796,3 +1134,23 @@ class BioyondV1RPC(BaseRequest): def get_available_materials(self): """获取所有可用的材料名称列表""" return list(self.material_cache.keys()) + + def get_scheduler_state(self) -> Optional[MachineState]: + """将调度状态字符串映射为枚举值 + + 返回值: + Optional[MachineState]: 映射后的枚举,失败返回 None + """ + data = self.scheduler_status() + if not isinstance(data, dict): + return None + status = data.get("schedulerStatus") + mapping = { + "Init": MachineState.INITIAL, + "Stop": MachineState.STOPPED, + "Running": MachineState.RUNNING, + "Pause": MachineState.PAUSED, + "ErrorPause": MachineState.ERROR_PAUSED, + "ErrorStop": MachineState.ERROR_STOPPED, + } + return mapping.get(status) diff --git a/unilabos/devices/workstation/bioyond_studio/dispensing_station.py b/unilabos/devices/workstation/bioyond_studio/dispensing_station.py index 561626c4..e80a1ef6 100644 --- a/unilabos/devices/workstation/bioyond_studio/dispensing_station.py +++ b/unilabos/devices/workstation/bioyond_studio/dispensing_station.py @@ -1,11 +1,17 @@ from datetime import datetime import json import time -from typing import Optional, Dict, Any +from typing import Optional, Dict, Any, List +import requests +from unilabos.devices.workstation.bioyond_studio.config import API_CONFIG from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondException from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation - +from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode +import json +import sys +from pathlib import Path +import importlib class BioyondDispensingStation(BioyondWorkstation): def __init__( @@ -28,6 +34,108 @@ class BioyondDispensingStation(BioyondWorkstation): # 用于跟踪任务完成状态的字典: {orderCode: {status, order_id, timestamp}} self.order_completion_status = {} + def _post_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]: + """项目接口通用POST调用 + + 参数: + endpoint: 接口路径(例如 /api/lims/order/brief-step-paramerers) + data: 请求体中的 data 字段内容 + + 返回: + dict: 服务端响应,失败时返回 {code:0,message,...} + """ + request_data = { + "apiKey": API_CONFIG["api_key"], + "requestTime": self.hardware_interface.get_current_time_iso8601(), + "data": data + } + try: + response = requests.post( + f"{self.hardware_interface.host}{endpoint}", + json=request_data, + headers={"Content-Type": "application/json"}, + timeout=30 + ) + result = response.json() + return result if isinstance(result, dict) else {"code": 0, "message": "非JSON响应"} + except json.JSONDecodeError: + return {"code": 0, "message": "非JSON响应"} + except requests.exceptions.Timeout: + return {"code": 0, "message": "请求超时"} + except requests.exceptions.RequestException as e: + return {"code": 0, "message": str(e)} + + def _delete_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]: + """项目接口通用DELETE调用 + + 参数: + endpoint: 接口路径(例如 /api/lims/order/workflows) + data: 请求体中的 data 字段内容 + + 返回: + dict: 服务端响应,失败时返回 {code:0,message,...} + """ + request_data = { + "apiKey": API_CONFIG["api_key"], + "requestTime": self.hardware_interface.get_current_time_iso8601(), + "data": data + } + try: + response = requests.delete( + f"{self.hardware_interface.host}{endpoint}", + json=request_data, + headers={"Content-Type": "application/json"}, + timeout=30 + ) + result = response.json() + return result if isinstance(result, dict) else {"code": 0, "message": "非JSON响应"} + except json.JSONDecodeError: + return {"code": 0, "message": "非JSON响应"} + except requests.exceptions.Timeout: + return {"code": 0, "message": "请求超时"} + except requests.exceptions.RequestException as e: + return {"code": 0, "message": str(e)} + + def compute_experiment_design( + self, + ratio: dict, + wt_percent: str = "0.25", + m_tot: str = "70", + titration_percent: str = "0.03", + ) -> dict: + try: + if isinstance(ratio, str): + try: + ratio = json.loads(ratio) + except Exception: + ratio = {} + root = str(Path(__file__).resolve().parents[3]) + if root not in sys.path: + sys.path.append(root) + try: + mod = importlib.import_module("tem.compute") + except Exception as e: + raise BioyondException(f"无法导入计算模块: {e}") + try: + wp = float(wt_percent) if isinstance(wt_percent, str) else wt_percent + mt = float(m_tot) if isinstance(m_tot, str) else m_tot + tp = float(titration_percent) if isinstance(titration_percent, str) else titration_percent + except Exception as e: + raise BioyondException(f"参数解析失败: {e}") + res = mod.generate_experiment_design(ratio=ratio, wt_percent=wp, m_tot=mt, titration_percent=tp) + out = { + "solutions": res.get("solutions", []), + "titration": res.get("titration", {}), + "solvents": res.get("solvents", {}), + "feeding_order": res.get("feeding_order", []), + "return_info": json.dumps(res, ensure_ascii=False) + } + return out + except BioyondException: + raise + except Exception as e: + raise BioyondException(str(e)) + # 90%10%小瓶投料任务创建方法 def create_90_10_vial_feeding_task(self, order_name: str = None, @@ -649,6 +757,40 @@ class BioyondDispensingStation(BioyondWorkstation): self.hardware_interface._logger.error(error_msg) raise BioyondException(error_msg) + def brief_step_parameters(self, data: Dict[str, Any]) -> Dict[str, Any]: + """获取简要步骤参数(站点项目接口) + + 参数: + data: 查询参数字典 + + 返回值: + dict: 接口返回数据 + """ + return self._post_project_api("/api/lims/order/brief-step-paramerers", data) + + def project_order_report(self, order_id: str) -> Dict[str, Any]: + """查询项目端订单报告(兼容旧路径) + + 参数: + order_id: 订单ID + + 返回值: + dict: 报告数据 + """ + return self._post_project_api("/api/lims/order/project-order-report", order_id) + + def workflow_sample_locations(self, workflow_id: str) -> Dict[str, Any]: + """查询工作流样品库位(站点项目接口) + + 参数: + workflow_id: 工作流ID + + 返回值: + dict: 位置信息数据 + """ + return self._post_project_api("/api/lims/storage/workflow-sample-locations", workflow_id) + + # 批量创建90%10%小瓶投料任务 def batch_create_90_10_vial_feeding_tasks(self, titration, @@ -779,8 +921,38 @@ class BioyondDispensingStation(BioyondWorkstation): self.hardware_interface._logger.error(error_msg) raise BioyondException(error_msg) + def _extract_actuals_from_report(self, report) -> Dict[str, Any]: + data = report.get('data') if isinstance(report, dict) else None + actual_target_weigh = None + actual_volume = None + if data: + extra = data.get('extraProperties') or {} + if isinstance(extra, dict): + for v in extra.values(): + obj = None + try: + obj = json.loads(v) if isinstance(v, str) else v + except Exception: + obj = None + if isinstance(obj, dict): + tw = obj.get('targetWeigh') + vol = obj.get('volume') + if tw is not None: + try: + actual_target_weigh = float(tw) + except Exception: + pass + if vol is not None: + try: + actual_volume = float(vol) + except Exception: + pass + return { + 'actualTargetWeigh': actual_target_weigh, + 'actualVolume': actual_volume + } - + # 等待多个任务完成并获取实验报告 def wait_for_multiple_orders_and_get_reports(self, batch_create_result: str = None, timeout: int = 7200, @@ -902,6 +1074,7 @@ class BioyondDispensingStation(BioyondWorkstation): "status": "timeout", "completion_status": None, "report": None, + "extracted": None, "elapsed_time": elapsed_time }) @@ -921,8 +1094,7 @@ class BioyondDispensingStation(BioyondWorkstation): # 获取实验报告 try: - report_query = json.dumps({"order_id": order_id}) - report = self.hardware_interface.order_report(report_query) + report = self.project_order_report(order_id) if not report: self.hardware_interface._logger.warning( @@ -940,6 +1112,7 @@ class BioyondDispensingStation(BioyondWorkstation): "status": "completed", "completion_status": completion_info.get('status'), "report": report, + "extracted": self._extract_actuals_from_report(report), "elapsed_time": elapsed_time }) @@ -959,6 +1132,7 @@ class BioyondDispensingStation(BioyondWorkstation): "status": "error", "completion_status": completion_info.get('status'), "report": None, + "extracted": None, "error": str(e), "elapsed_time": elapsed_time }) @@ -1052,6 +1226,266 @@ class BioyondDispensingStation(BioyondWorkstation): self.hardware_interface._logger.error(f"处理任务完成报送失败: {e}") return {"processed": False, "error": str(e)} + def transfer_materials_to_reaction_station( + self, + target_device_id: str, + transfer_groups: list + ) -> dict: + """ + 将配液站完成的物料转移到指定反应站的堆栈库位 + 支持多组转移任务,每组包含物料名称、目标堆栈和目标库位 + + Args: + target_device_id: 目标反应站设备ID(所有转移组使用同一个设备) + transfer_groups: 转移任务组列表,每组包含: + - materials: 物料名称(字符串,将通过RPC查询) + - target_stack: 目标堆栈名称(如"堆栈1左") + - target_sites: 目标库位(如"A01") + + Returns: + dict: 转移结果 + { + "success": bool, + "total_groups": int, + "successful_groups": int, + "failed_groups": int, + "target_device_id": str, + "details": [...] + } + """ + try: + # 验证参数 + if not target_device_id: + raise ValueError("目标设备ID不能为空") + + if not transfer_groups: + raise ValueError("转移任务组列表不能为空") + + if not isinstance(transfer_groups, list): + raise ValueError("transfer_groups必须是列表类型") + + # 标准化设备ID格式: 确保以 /devices/ 开头 + if not target_device_id.startswith("/devices/"): + if target_device_id.startswith("/"): + target_device_id = f"/devices{target_device_id}" + else: + target_device_id = f"/devices/{target_device_id}" + + self.hardware_interface._logger.info( + f"目标设备ID标准化为: {target_device_id}" + ) + + self.hardware_interface._logger.info( + f"开始执行批量物料转移: {len(transfer_groups)}组任务 -> {target_device_id}" + ) + + from .config import WAREHOUSE_MAPPING + results = [] + successful_count = 0 + failed_count = 0 + + for idx, group in enumerate(transfer_groups, 1): + try: + # 提取参数 + material_name = group.get("materials", "") + target_stack = group.get("target_stack", "") + target_sites = group.get("target_sites", "") + + # 验证必填参数 + if not material_name: + raise ValueError(f"第{idx}组: 物料名称不能为空") + if not target_stack: + raise ValueError(f"第{idx}组: 目标堆栈不能为空") + if not target_sites: + raise ValueError(f"第{idx}组: 目标库位不能为空") + + self.hardware_interface._logger.info( + f"处理第{idx}组转移: {material_name} -> " + f"{target_device_id}/{target_stack}/{target_sites}" + ) + + # 通过物料名称从deck获取ResourcePLR对象 + try: + material_resource = self.deck.get_resource(material_name) + if not material_resource: + raise ValueError(f"在deck中未找到物料: {material_name}") + + self.hardware_interface._logger.info( + f"从deck获取到物料 {material_name}: {material_resource}" + ) + except Exception as e: + raise ValueError( + f"获取物料 {material_name} 失败: {str(e)},请确认物料已正确加载到deck中" + ) + + # 验证目标堆栈是否存在 + if target_stack not in WAREHOUSE_MAPPING: + raise ValueError( + f"未知的堆栈名称: {target_stack}," + f"可选值: {list(WAREHOUSE_MAPPING.keys())}" + ) + + # 验证库位是否有效 + stack_sites = WAREHOUSE_MAPPING[target_stack].get("site_uuids", {}) + if target_sites not in stack_sites: + raise ValueError( + f"库位 {target_sites} 不存在于堆栈 {target_stack} 中," + f"可选库位: {list(stack_sites.keys())}" + ) + + # 获取目标库位的UUID + target_site_uuid = stack_sites[target_sites] + if not target_site_uuid: + raise ValueError( + f"库位 {target_sites} 的 UUID 未配置,请在 WAREHOUSE_MAPPING 中完善" + ) + + # 目标位点(包含UUID) + future = ROS2DeviceNode.run_async_func( + self._ros_node.get_resource_with_dir, + True, + **{ + "resource_id": f"/reaction_station_bioyond/Bioyond_Deck/{target_stack}", + "with_children": True, + }, + ) + # 等待异步完成后再获取结果 + if not future: + raise ValueError(f"获取目标堆栈资源future无效: {target_stack}") + while not future.done(): + time.sleep(0.1) + target_site_resource = future.result() + + # 调用父类的 transfer_resource_to_another 方法 + # 传入ResourcePLR对象和目标位点资源 + future = self.transfer_resource_to_another( + resource=[material_resource], + mount_resource=[target_site_resource], + sites=[target_sites], + mount_device_id=target_device_id + ) + + # 等待异步任务完成(轮询直到完成,再取结果) + if future: + try: + while not future.done(): + time.sleep(0.1) + future.result() + self.hardware_interface._logger.info( + f"异步转移任务已完成: {material_name}" + ) + except Exception as e: + raise ValueError(f"转移任务执行失败: {str(e)}") + + self.hardware_interface._logger.info( + f"第{idx}组转移成功: {material_name} -> " + f"{target_device_id}/{target_stack}/{target_sites}" + ) + + successful_count += 1 + results.append({ + "group_index": idx, + "success": True, + "material_name": material_name, + "target_stack": target_stack, + "target_site": target_sites, + "message": "转移成功" + }) + + except Exception as e: + error_msg = f"第{idx}组转移失败: {str(e)}" + self.hardware_interface._logger.error(error_msg) + failed_count += 1 + results.append({ + "group_index": idx, + "success": False, + "material_name": group.get("materials", ""), + "error": str(e) + }) + + # 返回汇总结果 + return { + "success": failed_count == 0, + "total_groups": len(transfer_groups), + "successful_groups": successful_count, + "failed_groups": failed_count, + "target_device_id": target_device_id, + "details": results, + "message": f"完成 {len(transfer_groups)} 组转移任务到 {target_device_id}: " + f"{successful_count} 成功, {failed_count} 失败" + } + + except Exception as e: + error_msg = f"批量转移物料失败: {str(e)}" + self.hardware_interface._logger.error(error_msg) + return { + "success": False, + "total_groups": len(transfer_groups) if transfer_groups else 0, + "successful_groups": 0, + "failed_groups": len(transfer_groups) if transfer_groups else 0, + "target_device_id": target_device_id if target_device_id else "", + "error": error_msg + } + + def query_resource_by_name(self, material_name: str): + """ + 通过物料名称查询资源对象(适用于Bioyond系统) + + Args: + material_name: 物料名称 + + Returns: + 物料ID或None + """ + try: + # Bioyond系统使用material_cache存储物料信息 + if not hasattr(self.hardware_interface, 'material_cache'): + self.hardware_interface._logger.error( + "hardware_interface没有material_cache属性" + ) + return None + + material_cache = self.hardware_interface.material_cache + + self.hardware_interface._logger.info( + f"查询物料 '{material_name}', 缓存中共有 {len(material_cache)} 个物料" + ) + + # 调试: 打印前几个物料信息 + if material_cache: + cache_items = list(material_cache.items())[:5] + for name, material_id in cache_items: + self.hardware_interface._logger.debug( + f"缓存物料: name={name}, id={material_id}" + ) + + # 直接从缓存中查找 + if material_name in material_cache: + material_id = material_cache[material_name] + self.hardware_interface._logger.info( + f"找到物料: {material_name} -> ID: {material_id}" + ) + return material_id + + self.hardware_interface._logger.warning( + f"未找到物料: {material_name} (缓存中无此物料)" + ) + + # 打印所有可用物料名称供参考 + available_materials = list(material_cache.keys()) + if available_materials: + self.hardware_interface._logger.info( + f"可用物料列表(前10个): {available_materials[:10]}" + ) + + return None + + except Exception as e: + self.hardware_interface._logger.error( + f"查询物料失败 {material_name}: {str(e)}" + ) + return None + if __name__ == "__main__": bioyond = BioyondDispensingStation(config={ diff --git a/unilabos/devices/workstation/bioyond_studio/reaction_station.py b/unilabos/devices/workstation/bioyond_studio/reaction_station.py index 68440a3b..ffb83fd3 100644 --- a/unilabos/devices/workstation/bioyond_studio/reaction_station.py +++ b/unilabos/devices/workstation/bioyond_studio/reaction_station.py @@ -5,6 +5,7 @@ from typing import List, Dict, Any from pathlib import Path from datetime import datetime from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation +from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import MachineState from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String from unilabos.devices.workstation.bioyond_studio.config import ( WORKFLOW_STEP_IDS, @@ -717,8 +718,7 @@ class BioyondReactionStation(BioyondWorkstation): if oc in self.order_completion_status: info = self.order_completion_status[oc] try: - rq = json.dumps({"order_id": oid}) - rep = self.hardware_interface.order_report(rq) + rep = self.hardware_interface.order_report(oid) if not rep: rep = {"error": "无法获取报告"} reports.append({ @@ -912,6 +912,106 @@ class BioyondReactionStation(BioyondWorkstation): """ return self.hardware_interface.create_order(json_str) + def hard_delete_merged_workflows(self, workflow_ids: List[str]) -> Dict[str, Any]: + """ + 调用新接口:硬删除合并后的工作流 + + Args: + workflow_ids: 要删除的工作流ID数组 + + Returns: + 删除结果 + """ + try: + if not isinstance(workflow_ids, list): + raise ValueError("workflow_ids必须是字符串数组") + return self._delete_project_api("/api/lims/order/workflows", workflow_ids) + except Exception as e: + print(f"❌ 硬删除异常: {str(e)}") + return {"code": 0, "message": str(e), "timestamp": int(time.time())} + + # ==================== 项目接口通用方法 ==================== + + def _post_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]: + """项目接口通用POST调用 + + 参数: + endpoint: 接口路径(例如 /api/lims/order/skip-titration-steps) + data: 请求体中的 data 字段内容 + + 返回: + dict: 服务端响应,失败时返回 {code:0,message,...} + """ + request_data = { + "apiKey": API_CONFIG["api_key"], + "requestTime": self.hardware_interface.get_current_time_iso8601(), + "data": data + } + print(f"\n📤 项目POST请求: {self.hardware_interface.host}{endpoint}") + print(json.dumps(request_data, indent=4, ensure_ascii=False)) + try: + response = requests.post( + f"{self.hardware_interface.host}{endpoint}", + json=request_data, + headers={"Content-Type": "application/json"}, + timeout=30 + ) + result = response.json() + if result.get("code") == 1: + print("✅ 请求成功") + else: + print(f"❌ 请求失败: {result.get('message','未知错误')}") + return result + except json.JSONDecodeError: + print("❌ 非JSON响应") + return {"code": 0, "message": "非JSON响应", "timestamp": int(time.time())} + except requests.exceptions.Timeout: + print("❌ 请求超时") + return {"code": 0, "message": "请求超时", "timestamp": int(time.time())} + except requests.exceptions.RequestException as e: + print(f"❌ 网络异常: {str(e)}") + return {"code": 0, "message": str(e), "timestamp": int(time.time())} + + def _delete_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]: + """项目接口通用DELETE调用 + + 参数: + endpoint: 接口路径(例如 /api/lims/order/workflows) + data: 请求体中的 data 字段内容 + + 返回: + dict: 服务端响应,失败时返回 {code:0,message,...} + """ + request_data = { + "apiKey": API_CONFIG["api_key"], + "requestTime": self.hardware_interface.get_current_time_iso8601(), + "data": data + } + print(f"\n📤 项目DELETE请求: {self.hardware_interface.host}{endpoint}") + print(json.dumps(request_data, indent=4, ensure_ascii=False)) + try: + response = requests.delete( + f"{self.hardware_interface.host}{endpoint}", + json=request_data, + headers={"Content-Type": "application/json"}, + timeout=30 + ) + result = response.json() + if result.get("code") == 1: + print("✅ 请求成功") + else: + print(f"❌ 请求失败: {result.get('message','未知错误')}") + return result + except json.JSONDecodeError: + print("❌ 非JSON响应") + return {"code": 0, "message": "非JSON响应", "timestamp": int(time.time())} + except requests.exceptions.Timeout: + print("❌ 请求超时") + return {"code": 0, "message": "请求超时", "timestamp": int(time.time())} + except requests.exceptions.RequestException as e: + print(f"❌ 网络异常: {str(e)}") + return {"code": 0, "message": str(e), "timestamp": int(time.time())} + # ==================== 工作流执行核心方法 ==================== def process_web_workflows(self, web_workflow_json: str) -> List[Dict[str, str]]: @@ -942,76 +1042,6 @@ class BioyondReactionStation(BioyondWorkstation): print(f"错误:处理工作流失败: {e}") return [] - def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict: - """ - 一站式处理工作流程:解析网页工作流列表,合并工作流(带参数),然后发布任务 - - Args: - workflow_name: 合并后的工作流名称 - task_name: 任务名称 - - Returns: - 任务创建结果 - """ - web_workflow_list = self.get_workflow_sequence() - print(f"\n{'='*60}") - print(f"📋 处理网页工作流列表: {web_workflow_list}") - print(f"{'='*60}") - - web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list}) - workflows_result = self.process_web_workflows(web_workflow_json) - - if not workflows_result: - return self._create_error_result("处理网页工作流列表失败", "process_web_workflows") - - print(f"workflows_result 类型: {type(workflows_result)}") - print(f"workflows_result 内容: {workflows_result}") - - workflows_with_params = self._build_workflows_with_parameters(workflows_result) - - merge_data = { - "name": workflow_name, - "workflows": workflows_with_params - } - - # print(f"\n🔄 合并工作流(带参数),名称: {workflow_name}") - merged_workflow = self.merge_workflow_with_parameters(json.dumps(merge_data)) - - if not merged_workflow: - return self._create_error_result("合并工作流失败", "merge_workflow_with_parameters") - - workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "") - # print(f"\n📤 使用工作流创建任务: {workflow_name} (ID: {workflow_id})") - - order_params = [{ - "orderCode": f"task_{self.hardware_interface.get_current_time_iso8601()}", - "orderName": task_name, - "workFlowId": workflow_id, - "borderNumber": 1, - "paramValues": {} - }] - - result = self.create_order(json.dumps(order_params)) - - if not result: - return self._create_error_result("创建任务失败", "create_order") - - # 清空工作流序列和参数,防止下次执行时累积重复 - self.pending_task_params = [] - self.clear_workflows() # 清空工作流序列,避免重复累积 - - # print(f"\n✅ 任务创建成功: {result}") - # print(f"\n✅ 任务创建成功") - print(f"{'='*60}\n") - - # 返回结果,包含合并后的工作流数据和订单参数 - return json.dumps({ - "success": True, - "result": result, - "merged_workflow": merged_workflow, - "order_params": order_params - }) - def _build_workflows_with_parameters(self, workflows_result: list) -> list: """ 构建带参数的工作流列表 @@ -1211,3 +1241,90 @@ class BioyondReactionStation(BioyondWorkstation): print(f" ❌ 工作流ID验证失败: {e}") print(f" 💡 将重新合并工作流") return False + + def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict: + """ + 一站式处理工作流程:解析网页工作流列表,合并工作流(带参数),然后发布任务 + + Args: + workflow_name: 合并后的工作流名称 + task_name: 任务名称 + + Returns: + 任务创建结果 + """ + web_workflow_list = self.get_workflow_sequence() + print(f"\n{'='*60}") + print(f"📋 处理网页工作流列表: {web_workflow_list}") + print(f"{'='*60}") + + web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list}) + workflows_result = self.process_web_workflows(web_workflow_json) + + if not workflows_result: + return self._create_error_result("处理网页工作流列表失败", "process_web_workflows") + + print(f"workflows_result 类型: {type(workflows_result)}") + print(f"workflows_result 内容: {workflows_result}") + + workflows_with_params = self._build_workflows_with_parameters(workflows_result) + + merge_data = { + "name": workflow_name, + "workflows": workflows_with_params + } + + # print(f"\n🔄 合并工作流(带参数),名称: {workflow_name}") + merged_workflow = self.merge_workflow_with_parameters(json.dumps(merge_data)) + + if not merged_workflow: + return self._create_error_result("合并工作流失败", "merge_workflow_with_parameters") + + workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "") + # print(f"\n📤 使用工作流创建任务: {workflow_name} (ID: {workflow_id})") + + order_params = [{ + "orderCode": f"task_{self.hardware_interface.get_current_time_iso8601()}", + "orderName": task_name, + "workFlowId": workflow_id, + "borderNumber": 1, + "paramValues": {} + }] + + result = self.create_order(json.dumps(order_params)) + + if not result: + return self._create_error_result("创建任务失败", "create_order") + + # 清空工作流序列和参数,防止下次执行时累积重复 + self.pending_task_params = [] + self.clear_workflows() # 清空工作流序列,避免重复累积 + + # print(f"\n✅ 任务创建成功: {result}") + # print(f"\n✅ 任务创建成功") + print(f"{'='*60}\n") + + # 返回结果,包含合并后的工作流数据和订单参数 + return json.dumps({ + "success": True, + "result": result, + "merged_workflow": merged_workflow, + "order_params": order_params + }) + + # ==================== 反应器操作接口 ==================== + + def skip_titration_steps(self, preintake_id: str) -> Dict[str, Any]: + """跳过当前正在进行的滴定步骤 + + Args: + preintake_id: 通量ID + + Returns: + Dict[str, Any]: 服务器响应,包含状态码、消息和时间戳 + """ + try: + return self._post_project_api("/api/lims/order/skip-titration-steps", preintake_id) + except Exception as e: + print(f"❌ 跳过滴定异常: {str(e)}") + return {"code": 0, "message": str(e), "timestamp": int(time.time())}