mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 13:31:20 +00:00
feat(bioyond_studio): 添加项目API接口支持及优化物料管理功能
添加通用项目API接口方法(_post_project_api, _delete_project_api)用于与LIMS系统交互 实现compute_experiment_design方法用于实验设计计算 新增brief_step_parameters等订单相关接口方法 优化物料转移逻辑,增加异步任务处理 扩展BioyondV1RPC类,添加批量物料操作、订单状态管理等功能
This commit is contained in:
@@ -1,11 +1,17 @@
|
||||
from datetime import datetime
|
||||
import json
|
||||
import time
|
||||
from typing import Optional, Dict, Any
|
||||
from typing import Optional, Dict, Any, List
|
||||
import requests
|
||||
from unilabos.devices.workstation.bioyond_studio.config import API_CONFIG
|
||||
|
||||
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondException
|
||||
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
|
||||
|
||||
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
import importlib
|
||||
|
||||
class BioyondDispensingStation(BioyondWorkstation):
|
||||
def __init__(
|
||||
@@ -28,6 +34,108 @@ class BioyondDispensingStation(BioyondWorkstation):
|
||||
# 用于跟踪任务完成状态的字典: {orderCode: {status, order_id, timestamp}}
|
||||
self.order_completion_status = {}
|
||||
|
||||
def _post_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]:
|
||||
"""项目接口通用POST调用
|
||||
|
||||
参数:
|
||||
endpoint: 接口路径(例如 /api/lims/order/brief-step-paramerers)
|
||||
data: 请求体中的 data 字段内容
|
||||
|
||||
返回:
|
||||
dict: 服务端响应,失败时返回 {code:0,message,...}
|
||||
"""
|
||||
request_data = {
|
||||
"apiKey": API_CONFIG["api_key"],
|
||||
"requestTime": self.hardware_interface.get_current_time_iso8601(),
|
||||
"data": data
|
||||
}
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{self.hardware_interface.host}{endpoint}",
|
||||
json=request_data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=30
|
||||
)
|
||||
result = response.json()
|
||||
return result if isinstance(result, dict) else {"code": 0, "message": "非JSON响应"}
|
||||
except json.JSONDecodeError:
|
||||
return {"code": 0, "message": "非JSON响应"}
|
||||
except requests.exceptions.Timeout:
|
||||
return {"code": 0, "message": "请求超时"}
|
||||
except requests.exceptions.RequestException as e:
|
||||
return {"code": 0, "message": str(e)}
|
||||
|
||||
def _delete_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]:
|
||||
"""项目接口通用DELETE调用
|
||||
|
||||
参数:
|
||||
endpoint: 接口路径(例如 /api/lims/order/workflows)
|
||||
data: 请求体中的 data 字段内容
|
||||
|
||||
返回:
|
||||
dict: 服务端响应,失败时返回 {code:0,message,...}
|
||||
"""
|
||||
request_data = {
|
||||
"apiKey": API_CONFIG["api_key"],
|
||||
"requestTime": self.hardware_interface.get_current_time_iso8601(),
|
||||
"data": data
|
||||
}
|
||||
try:
|
||||
response = requests.delete(
|
||||
f"{self.hardware_interface.host}{endpoint}",
|
||||
json=request_data,
|
||||
headers={"Content-Type": "application/json"},
|
||||
timeout=30
|
||||
)
|
||||
result = response.json()
|
||||
return result if isinstance(result, dict) else {"code": 0, "message": "非JSON响应"}
|
||||
except json.JSONDecodeError:
|
||||
return {"code": 0, "message": "非JSON响应"}
|
||||
except requests.exceptions.Timeout:
|
||||
return {"code": 0, "message": "请求超时"}
|
||||
except requests.exceptions.RequestException as e:
|
||||
return {"code": 0, "message": str(e)}
|
||||
|
||||
def compute_experiment_design(
|
||||
self,
|
||||
ratio: dict,
|
||||
wt_percent: str = "0.25",
|
||||
m_tot: str = "70",
|
||||
titration_percent: str = "0.03",
|
||||
) -> dict:
|
||||
try:
|
||||
if isinstance(ratio, str):
|
||||
try:
|
||||
ratio = json.loads(ratio)
|
||||
except Exception:
|
||||
ratio = {}
|
||||
root = str(Path(__file__).resolve().parents[3])
|
||||
if root not in sys.path:
|
||||
sys.path.append(root)
|
||||
try:
|
||||
mod = importlib.import_module("tem.compute")
|
||||
except Exception as e:
|
||||
raise BioyondException(f"无法导入计算模块: {e}")
|
||||
try:
|
||||
wp = float(wt_percent) if isinstance(wt_percent, str) else wt_percent
|
||||
mt = float(m_tot) if isinstance(m_tot, str) else m_tot
|
||||
tp = float(titration_percent) if isinstance(titration_percent, str) else titration_percent
|
||||
except Exception as e:
|
||||
raise BioyondException(f"参数解析失败: {e}")
|
||||
res = mod.generate_experiment_design(ratio=ratio, wt_percent=wp, m_tot=mt, titration_percent=tp)
|
||||
out = {
|
||||
"solutions": res.get("solutions", []),
|
||||
"titration": res.get("titration", {}),
|
||||
"solvents": res.get("solvents", {}),
|
||||
"feeding_order": res.get("feeding_order", []),
|
||||
"return_info": json.dumps(res, ensure_ascii=False)
|
||||
}
|
||||
return out
|
||||
except BioyondException:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise BioyondException(str(e))
|
||||
|
||||
# 90%10%小瓶投料任务创建方法
|
||||
def create_90_10_vial_feeding_task(self,
|
||||
order_name: str = None,
|
||||
@@ -649,6 +757,40 @@ class BioyondDispensingStation(BioyondWorkstation):
|
||||
self.hardware_interface._logger.error(error_msg)
|
||||
raise BioyondException(error_msg)
|
||||
|
||||
def brief_step_parameters(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""获取简要步骤参数(站点项目接口)
|
||||
|
||||
参数:
|
||||
data: 查询参数字典
|
||||
|
||||
返回值:
|
||||
dict: 接口返回数据
|
||||
"""
|
||||
return self._post_project_api("/api/lims/order/brief-step-paramerers", data)
|
||||
|
||||
def project_order_report(self, order_id: str) -> Dict[str, Any]:
|
||||
"""查询项目端订单报告(兼容旧路径)
|
||||
|
||||
参数:
|
||||
order_id: 订单ID
|
||||
|
||||
返回值:
|
||||
dict: 报告数据
|
||||
"""
|
||||
return self._post_project_api("/api/lims/order/project-order-report", order_id)
|
||||
|
||||
def workflow_sample_locations(self, workflow_id: str) -> Dict[str, Any]:
|
||||
"""查询工作流样品库位(站点项目接口)
|
||||
|
||||
参数:
|
||||
workflow_id: 工作流ID
|
||||
|
||||
返回值:
|
||||
dict: 位置信息数据
|
||||
"""
|
||||
return self._post_project_api("/api/lims/storage/workflow-sample-locations", workflow_id)
|
||||
|
||||
|
||||
# 批量创建90%10%小瓶投料任务
|
||||
def batch_create_90_10_vial_feeding_tasks(self,
|
||||
titration,
|
||||
@@ -952,8 +1094,7 @@ class BioyondDispensingStation(BioyondWorkstation):
|
||||
|
||||
# 获取实验报告
|
||||
try:
|
||||
report_query = json.dumps({"order_id": order_id})
|
||||
report = self.hardware_interface.order_report(report_query)
|
||||
report = self.project_order_report(order_id)
|
||||
|
||||
if not report:
|
||||
self.hardware_interface._logger.warning(
|
||||
@@ -1129,7 +1270,7 @@ class BioyondDispensingStation(BioyondWorkstation):
|
||||
target_device_id = f"/devices{target_device_id}"
|
||||
else:
|
||||
target_device_id = f"/devices/{target_device_id}"
|
||||
|
||||
|
||||
self.hardware_interface._logger.info(
|
||||
f"目标设备ID标准化为: {target_device_id}"
|
||||
)
|
||||
@@ -1192,18 +1333,49 @@ class BioyondDispensingStation(BioyondWorkstation):
|
||||
f"可选库位: {list(stack_sites.keys())}"
|
||||
)
|
||||
|
||||
# 获取目标库位的UUID
|
||||
target_site_uuid = stack_sites[target_sites]
|
||||
if not target_site_uuid:
|
||||
raise ValueError(
|
||||
f"库位 {target_sites} 的 UUID 未配置,请在 WAREHOUSE_MAPPING 中完善"
|
||||
)
|
||||
|
||||
# 目标位点(包含UUID)
|
||||
future = ROS2DeviceNode.run_async_func(
|
||||
self._ros_node.get_resource_with_dir,
|
||||
True,
|
||||
**{
|
||||
"resource_id": f"/reaction_station_bioyond/Bioyond_Deck/{target_stack}",
|
||||
"with_children": True,
|
||||
},
|
||||
)
|
||||
# 等待异步完成后再获取结果
|
||||
if not future:
|
||||
raise ValueError(f"获取目标堆栈资源future无效: {target_stack}")
|
||||
while not future.done():
|
||||
time.sleep(0.1)
|
||||
target_site_resource = future.result()
|
||||
|
||||
# 调用父类的 transfer_resource_to_another 方法
|
||||
# 传入ResourcePLR对象
|
||||
self.transfer_resource_to_another(
|
||||
# 传入ResourcePLR对象和目标位点资源
|
||||
future = self.transfer_resource_to_another(
|
||||
resource=[material_resource],
|
||||
mount_resource=[],
|
||||
mount_resource=[target_site_resource],
|
||||
sites=[target_sites],
|
||||
mount_device_id=target_device_id
|
||||
)
|
||||
|
||||
# 等待异步任务完成(临时方案)
|
||||
import time
|
||||
time.sleep(5)
|
||||
# 等待异步任务完成(轮询直到完成,再取结果)
|
||||
if future:
|
||||
try:
|
||||
while not future.done():
|
||||
time.sleep(0.1)
|
||||
future.result()
|
||||
self.hardware_interface._logger.info(
|
||||
f"异步转移任务已完成: {material_name}"
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(f"转移任务执行失败: {str(e)}")
|
||||
|
||||
self.hardware_interface._logger.info(
|
||||
f"第{idx}组转移成功: {material_name} -> "
|
||||
|
||||
Reference in New Issue
Block a user