refactor(bioyond_studio): 优化材料缓存加载和参数验证逻辑

改进材料缓存加载逻辑以支持多种材料类型和详细材料处理
更新工作流参数验证中的字段名从key/value改为Key/DisplayValue
移除未使用的merge_workflow_with_parameters方法
添加get_station_info方法获取工作站基础信息
清理实验文件中的注释代码和更新导入路径
This commit is contained in:
ZiWei
2025-10-16 23:58:24 +08:00
parent 398b2dde3f
commit f5753afb7c
5 changed files with 711 additions and 640 deletions

View File

@@ -245,79 +245,7 @@ class BioyondWorkstation(WorkstationBase):
}
# ==================== 工作流合并与参数设置 API ====================
def merge_workflow_with_parameters(self, json_str: str) -> dict:
"""合并工作流并设置参数"""
try:
# 解析输入的 JSON 数据
data = json.loads(json_str)
# 构造 API 请求参数
params = {
"name": data.get("name", ""),
"workflows": data.get("workflows", [])
}
# 验证必要参数
if not params["name"]:
return {
"code": 0,
"message": "工作流名称不能为空",
"timestamp": int(datetime.now().timestamp() * 1000)
}
if not params["workflows"]:
return {
"code": 0,
"message": "工作流列表不能为空",
"timestamp": int(datetime.now().timestamp() * 1000)
}
except json.JSONDecodeError as e:
return {
"code": 0,
"message": f"JSON 解析错误: {str(e)}",
"timestamp": int(datetime.now().timestamp() * 1000)
}
except Exception as e:
return {
"code": 0,
"message": f"参数处理错误: {str(e)}",
"timestamp": int(datetime.now().timestamp() * 1000)
}
# 发送 POST 请求到 Bioyond API
try:
response = self.hardware_interface.post(
url=f'{self.hardware_interface.host}/api/lims/workflow/merge-workflow-with-parameters',
params={
"apiKey": self.hardware_interface.api_key,
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": params,
})
# 处理响应
if not response:
return {
"code": 0,
"message": "API 请求失败,未收到响应",
"timestamp": int(datetime.now().timestamp() * 1000)
}
# 返回完整的响应结果
return {
"code": response.get("code", 0),
"message": response.get("message", ""),
"timestamp": response.get("timestamp", int(datetime.now().timestamp() * 1000))
}
except Exception as e:
return {
"code": 0,
"message": f"API 请求异常: {str(e)}",
"timestamp": int(datetime.now().timestamp() * 1000)
}
def append_to_workflow_sequence(self, web_workflow_name: str) -> bool:
# 检查是否为JSON格式的字符串
actual_workflow_name = web_workflow_name
@@ -391,6 +319,23 @@ class BioyondWorkstation(WorkstationBase):
# ==================== 基础物料管理接口 ====================
# ============ 工作站状态管理 ============
def get_station_info(self) -> Dict[str, Any]:
"""获取工作站基础信息
Returns:
Dict[str, Any]: 工作站基础信息包括设备ID、状态等
"""
return {
"device_id": getattr(self._ros_node, 'device_id', 'unknown'),
"station_type": "BioyondWorkstation",
"workflow_status": self.current_workflow_status.value if hasattr(self, 'current_workflow_status') else "unknown",
"is_busy": getattr(self, 'is_busy', False),
"deck_info": {
"name": self.deck.name if self.deck and hasattr(self.deck, 'name') else "unknown",
"children_count": len(self.deck.children) if self.deck and hasattr(self.deck, 'children') else 0
} if self.deck else None,
"hardware_interface": type(self.hardware_interface).__name__ if self.hardware_interface else None
}
def get_workstation_status(self) -> Dict[str, Any]:
"""获取工作站状态