mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-15 13:44:39 +00:00
* cancel upload_registry * Refactor Bioyond workstation and experiment workflow -fix (#111) * refactor(bioyond_studio): 优化材料缓存加载和参数验证逻辑 改进材料缓存加载逻辑以支持多种材料类型和详细材料处理 更新工作流参数验证中的字段名从key/value改为Key/DisplayValue 移除未使用的merge_workflow_with_parameters方法 添加get_station_info方法获取工作站基础信息 清理实验文件中的注释代码和更新导入路径 * fix: 修复资源移除时的父资源检查问题 在BaseROS2DeviceNode中,移除资源前添加对父资源是否为None的检查,避免空指针异常 同时更新Bottle和BottleCarrier类以支持**kwargs参数 修正测试文件中Liquid_feeding_beaker的大小写拼写错误 * correct return message --------- Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
762 lines
26 KiB
Python
762 lines
26 KiB
Python
# bioyond_rpc.py
|
||
"""
|
||
BioyondV1RPC类定义 - 负责HTTP接口通信和通用函数
|
||
仅包含基础的API调用、通用工具函数,不包含特定站点业务逻辑
|
||
"""
|
||
|
||
from enum import Enum
|
||
from datetime import datetime, timezone
|
||
from unilabos.device_comms.rpc import BaseRequest
|
||
from typing import Optional, List, Dict, Any
|
||
import json
|
||
from unilabos.devices.workstation.bioyond_studio.config import LOCATION_MAPPING
|
||
|
||
|
||
class SimpleLogger:
|
||
"""简单的日志记录器"""
|
||
def info(self, msg): print(f"[INFO] {msg}")
|
||
def error(self, msg): print(f"[ERROR] {msg}")
|
||
def debug(self, msg): print(f"[DEBUG] {msg}")
|
||
def warning(self, msg): print(f"[WARNING] {msg}")
|
||
def critical(self, msg): print(f"[CRITICAL] {msg}")
|
||
|
||
|
||
class MachineState(Enum):
|
||
INITIAL = 0
|
||
STOPPED = 1
|
||
RUNNING = 2
|
||
PAUSED = 3
|
||
ERROR_PAUSED = 4
|
||
ERROR_STOPPED = 5
|
||
|
||
|
||
class MaterialType(Enum):
|
||
Consumables = 0
|
||
Sample = 1
|
||
Reagent = 2
|
||
Product = 3
|
||
|
||
|
||
class BioyondException(Exception):
|
||
"""Bioyond操作异常"""
|
||
pass
|
||
|
||
|
||
class BioyondV1RPC(BaseRequest):
|
||
def __init__(self, config):
|
||
super().__init__()
|
||
print("开始初始化 BioyondV1RPC")
|
||
self.config = config
|
||
self.api_key = config["api_key"]
|
||
self.host = config["api_host"]
|
||
self._logger = SimpleLogger()
|
||
self.material_cache = {}
|
||
self._load_material_cache()
|
||
|
||
# ==================== 基础通用方法 ====================
|
||
|
||
def get_current_time_iso8601(self) -> str:
|
||
"""
|
||
获取当前时间,并格式化为 ISO 8601 格式(包含毫秒部分)。
|
||
|
||
:return: 当前时间的 ISO 8601 格式字符串
|
||
"""
|
||
current_time = datetime.now(timezone.utc).isoformat(
|
||
timespec='milliseconds'
|
||
)
|
||
# 替换时区部分为 'Z'
|
||
current_time = current_time.replace("+00:00", "Z")
|
||
return current_time
|
||
|
||
def get_logger(self):
|
||
return self._logger
|
||
|
||
# ==================== 物料查询相关接口 ====================
|
||
|
||
def stock_material(self, json_str: str) -> list:
|
||
"""
|
||
描述:返回所有当前在库的,已启用的物料
|
||
json_str 字段介绍,格式为JSON字符串:
|
||
'{"typeMode": 0, "filter": "样品", "includeDetail": true}'
|
||
|
||
typeMode: 物料类型, 样品1、试剂2、耗材0
|
||
filter: 过滤字段, 物料名称/物料编码
|
||
includeDetail: 是否包含所在库位。true,false
|
||
"""
|
||
try:
|
||
params = json.loads(json_str)
|
||
except json.JSONDecodeError:
|
||
return []
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/stock-material',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return []
|
||
return response.get("data", [])
|
||
|
||
def query_warehouse_by_material_type(self, type_id: str) -> dict:
|
||
"""
|
||
描述:查询物料类型可以入库的库位
|
||
type_id: 物料类型ID
|
||
"""
|
||
params = {
|
||
"typeId": type_id
|
||
}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/warehouse-info-by-mat-type-id',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response:
|
||
return {}
|
||
|
||
if response['code'] != 1:
|
||
print(
|
||
f"query warehouse by material type error: {response.get('message', '')}"
|
||
)
|
||
return {}
|
||
|
||
return response.get("data", {})
|
||
|
||
def material_id_query(self, json_str: str) -> dict:
|
||
"""
|
||
查询物料id
|
||
json_str 格式为JSON字符串:
|
||
'{"material123"}'
|
||
"""
|
||
params = json_str
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/workflow-sample-locations',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response:
|
||
return {}
|
||
|
||
if response['code'] != 1:
|
||
print(f"material_id_query error: {response.get('message')}")
|
||
return {}
|
||
|
||
print(f"material_id_query data: {response['data']}")
|
||
return response.get("data", {})
|
||
|
||
def add_material(self, params: dict) -> dict:
|
||
"""
|
||
描述:添加新的物料
|
||
json_str 格式为JSON字符串
|
||
"""
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/material',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response:
|
||
return {}
|
||
|
||
if response['code'] != 1:
|
||
print(f"add material error: {response.get('message', '')}")
|
||
return {}
|
||
|
||
print(f"add material data: {response['data']}")
|
||
return response.get("data", {})
|
||
|
||
def query_matial_type_id(self, data) -> list:
|
||
"""查找物料typeid"""
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/material-types',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": data
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return []
|
||
return str(response.get("data", {}))
|
||
|
||
def material_inbound(self, material_id: str, location_id: str) -> dict:
|
||
"""
|
||
描述:指定库位入库一个物料
|
||
material_id: 物料ID
|
||
location_name: 库位名称(会自动映射到location_id)
|
||
"""
|
||
params = {
|
||
"materialId": material_id,
|
||
"locationId": location_id
|
||
}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/inbound',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def delete_material(self, material_id: str) -> dict:
|
||
"""
|
||
描述:删除尚未入库的物料
|
||
material_id: 物料ID
|
||
"""
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/delete-material',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": material_id
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def material_outbound(self, material_id: str, location_name: str, quantity: int) -> dict:
|
||
"""指定库位出库物料"""
|
||
location_id = LOCATION_MAPPING.get(location_name, location_name)
|
||
|
||
params = {
|
||
"materialId": material_id,
|
||
"locationId": location_id,
|
||
"quantity": quantity
|
||
}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/storage/outbound',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response
|
||
|
||
# ==================== 工作流查询相关接口 ====================
|
||
|
||
def query_workflow(self, json_str: str) -> dict:
|
||
try:
|
||
params = json.loads(json_str)
|
||
except json.JSONDecodeError:
|
||
print(f"无效的JSON字符串: {json_str}")
|
||
return {}
|
||
except Exception as e:
|
||
print(f"处理JSON时出错: {str(e)}")
|
||
return {}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/workflow/work-flow-list',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def workflow_step_query(self, workflow_id: str) -> dict:
|
||
"""
|
||
描述:查询某一个子工作流的详细信息,包含所有步骤、参数信息
|
||
json_str 格式为JSON字符串:
|
||
'{"workflow_id": "workflow123"}'
|
||
"""
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/workflow/sub-workflow-step-parameters',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": workflow_id,
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def validate_workflow_parameters(self, workflows: List[Dict[str, Any]]) -> Dict[str, Any]:
|
||
"""验证工作流参数格式"""
|
||
try:
|
||
validation_errors = []
|
||
|
||
for i, workflow in enumerate(workflows):
|
||
workflow_errors = []
|
||
|
||
# 检查基本结构
|
||
if not isinstance(workflow, dict):
|
||
workflow_errors.append("工作流必须是字典类型")
|
||
continue
|
||
|
||
if "id" not in workflow:
|
||
workflow_errors.append("缺少必要的 'id' 字段")
|
||
|
||
# 检查 stepParameters(如果存在)
|
||
if "stepParameters" in workflow:
|
||
step_params = workflow["stepParameters"]
|
||
|
||
if not isinstance(step_params, dict):
|
||
workflow_errors.append("stepParameters 必须是字典类型")
|
||
else:
|
||
# 验证参数结构
|
||
for step_id, modules in step_params.items():
|
||
if not isinstance(modules, dict):
|
||
workflow_errors.append(f"步骤 {step_id} 的模块配置必须是字典类型")
|
||
continue
|
||
|
||
for module_name, params in modules.items():
|
||
if not isinstance(params, list):
|
||
workflow_errors.append(f"步骤 {step_id} 模块 {module_name} 的参数必须是列表类型")
|
||
continue
|
||
|
||
for j, param in enumerate(params):
|
||
if not isinstance(param, dict):
|
||
workflow_errors.append(f"步骤 {step_id} 模块 {module_name} 参数 {j} 必须是字典类型")
|
||
elif "Key" not in param or "DisplayValue" not in param:
|
||
workflow_errors.append(f"步骤 {step_id} 模块 {module_name} 参数 {j} 必须包含 Key 和 DisplayValue")
|
||
|
||
if workflow_errors:
|
||
validation_errors.append({
|
||
"workflow_index": i,
|
||
"workflow_id": workflow.get("id", "unknown"),
|
||
"errors": workflow_errors
|
||
})
|
||
|
||
if validation_errors:
|
||
return {
|
||
"valid": False,
|
||
"errors": validation_errors,
|
||
"message": f"发现 {len(validation_errors)} 个工作流存在验证错误"
|
||
}
|
||
else:
|
||
return {
|
||
"valid": True,
|
||
"message": f"所有 {len(workflows)} 个工作流验证通过"
|
||
}
|
||
|
||
except Exception as e:
|
||
return {
|
||
"valid": False,
|
||
"errors": [{"general_error": str(e)}],
|
||
"message": f"验证过程中发生异常: {str(e)}"
|
||
}
|
||
|
||
def get_workflow_parameter_template(self) -> Dict[str, Any]:
|
||
"""获取工作流参数模板"""
|
||
return {
|
||
"template": {
|
||
"name": "拼接后的长工作流的名称",
|
||
"workflows": [
|
||
{
|
||
"id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
|
||
"stepParameters": {
|
||
"步骤ID (UUID)": {
|
||
"模块名称": [
|
||
{
|
||
"key": "参数键名",
|
||
"value": "参数值或变量引用 {{index-m-n}}"
|
||
}
|
||
]
|
||
}
|
||
}
|
||
}
|
||
]
|
||
},
|
||
"parameter_descriptions": {
|
||
"name": "拼接后的长工作流名称",
|
||
"workflows": "待合并的子工作流列表",
|
||
"id": "子工作流 ID,对应工作流列表中 workflows 数组中每个对象的 id 字段",
|
||
"stepParameters": "步骤参数配置,如果子工作流没有参数则不需要填写"
|
||
}
|
||
}
|
||
|
||
# ==================== 任务订单相关接口 ====================
|
||
|
||
def create_order(self, json_str: str) -> dict:
|
||
"""
|
||
描述:新建并开始任务,返回需要的物料和入库的库位
|
||
json_str 格式为JSON字符串,包含任务参数
|
||
"""
|
||
try:
|
||
params = json.loads(json_str)
|
||
self._logger.info(f"创建任务参数: {params}")
|
||
self._logger.info(f"参数类型: {type(params)}")
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/order/order',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response:
|
||
raise BioyondException("API调用失败:未收到响应")
|
||
|
||
if response['code'] != 1:
|
||
error_msg = f"创建任务失败: {response.get('message', '未知错误')}"
|
||
self._logger.error(error_msg)
|
||
raise BioyondException(error_msg)
|
||
|
||
self._logger.info(f"创建任务成功,返回数据: {response['data']}")
|
||
result = str(response.get("data", {}))
|
||
return result
|
||
|
||
except BioyondException:
|
||
# 重新抛出BioyondException
|
||
raise
|
||
except json.JSONDecodeError as e:
|
||
error_msg = f"JSON解析失败: {str(e)}"
|
||
self._logger.error(error_msg)
|
||
raise BioyondException(error_msg) from e
|
||
except Exception as e:
|
||
# 捕获其他未预期的异常,转换为BioyondException
|
||
error_msg = f"创建任务时发生未预期的错误: {str(e)}"
|
||
self._logger.error(error_msg)
|
||
raise BioyondException(error_msg) from e
|
||
|
||
def order_query(self, json_str: str) -> dict:
|
||
"""
|
||
描述:查询任务列表
|
||
json_str 格式为JSON字符串
|
||
"""
|
||
try:
|
||
params = json.loads(json_str)
|
||
except json.JSONDecodeError:
|
||
return {}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/order/order-list',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def order_report(self, json_str: str) -> dict:
|
||
"""
|
||
描述:查询某个任务明细
|
||
json_str 格式为JSON字符串:
|
||
'{"order_id": "order123"}'
|
||
"""
|
||
try:
|
||
data = json.loads(json_str)
|
||
order_id = data.get("order_id", "")
|
||
except json.JSONDecodeError:
|
||
return {}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/order/order-report',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": order_id,
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def order_takeout(self, json_str: str) -> int:
|
||
"""
|
||
描述:取出任务产物
|
||
json_str 格式为JSON字符串:
|
||
'{"order_id": "order123", "preintake_id": "preintake123"}'
|
||
"""
|
||
try:
|
||
data = json.loads(json_str)
|
||
params = {
|
||
"orderId": data.get("order_id", ""),
|
||
"preintakeId": data.get("preintake_id", "")
|
||
}
|
||
except json.JSONDecodeError:
|
||
return 0
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/order/order-takeout',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params,
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
def sample_waste_removal(self, order_id: str) -> dict:
|
||
"""
|
||
样品/废料取出接口
|
||
|
||
参数:
|
||
- order_id: 订单ID
|
||
|
||
返回: 取出结果
|
||
"""
|
||
params = {"orderId": order_id}
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/order/take-out',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params
|
||
})
|
||
|
||
if not response:
|
||
return {}
|
||
|
||
if response['code'] != 1:
|
||
self._logger.error(f"样品废料取出错误: {response.get('message', '')}")
|
||
return {}
|
||
|
||
return response.get("data", {})
|
||
|
||
def cancel_order(self, json_str: str) -> bool:
|
||
"""
|
||
描述:取消指定任务
|
||
json_str 格式为JSON字符串:
|
||
'{"order_id": "order123"}'
|
||
"""
|
||
try:
|
||
data = json.loads(json_str)
|
||
order_id = data.get("order_id", "")
|
||
except json.JSONDecodeError:
|
||
return False
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/order/cancel-order',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": order_id,
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return False
|
||
return True
|
||
|
||
# ==================== 设备管理相关接口 ====================
|
||
|
||
def device_list(self, json_str: str = "") -> list:
|
||
"""
|
||
描述:获取所有设备列表
|
||
json_str 格式为JSON字符串,可选
|
||
"""
|
||
device_no = None
|
||
if json_str:
|
||
try:
|
||
data = json.loads(json_str)
|
||
device_no = data.get("device_no", None)
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
url = f'{self.host}/api/lims/device/device-list'
|
||
if device_no:
|
||
url += f'/{device_no}'
|
||
|
||
response = self.post(
|
||
url=url,
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return []
|
||
return response.get("data", [])
|
||
|
||
def device_operation(self, json_str: str) -> int:
|
||
"""
|
||
描述:操作设备
|
||
json_str 格式为JSON字符串
|
||
"""
|
||
try:
|
||
data = json.loads(json_str)
|
||
params = {
|
||
"deviceNo": data.get("device_no", ""),
|
||
"operationType": data.get("operation_type", 0),
|
||
"operationParams": data.get("operation_params", {})
|
||
}
|
||
except json.JSONDecodeError:
|
||
return 0
|
||
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/device/device-operation',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
"data": params,
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
# ==================== 调度器相关接口 ====================
|
||
|
||
def scheduler_status(self) -> dict:
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/scheduler/scheduler-status',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return {}
|
||
return response.get("data", {})
|
||
|
||
def scheduler_start(self) -> int:
|
||
"""描述:启动调度器"""
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/scheduler/start',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
def scheduler_pause(self) -> int:
|
||
"""描述:暂停调度器"""
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/scheduler/scheduler-pause',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
def scheduler_continue(self) -> int:
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/scheduler/scheduler-continue',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
def scheduler_stop(self) -> int:
|
||
"""描述:停止调度器"""
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/scheduler/scheduler-stop',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
def scheduler_reset(self) -> int:
|
||
"""描述:重置调度器"""
|
||
response = self.post(
|
||
url=f'{self.host}/api/lims/scheduler/scheduler-reset',
|
||
params={
|
||
"apiKey": self.api_key,
|
||
"requestTime": self.get_current_time_iso8601(),
|
||
})
|
||
|
||
if not response or response['code'] != 1:
|
||
return 0
|
||
return response.get("code", 0)
|
||
|
||
# ==================== 辅助方法 ====================
|
||
|
||
def _load_material_cache(self):
|
||
"""预加载材料列表到缓存中"""
|
||
try:
|
||
print("正在加载材料列表缓存...")
|
||
|
||
# 加载所有类型的材料:耗材(0)、样品(1)、试剂(2)
|
||
material_types = [1, 2]
|
||
|
||
for type_mode in material_types:
|
||
print(f"正在加载类型 {type_mode} 的材料...")
|
||
stock_query = f'{{"typeMode": {type_mode}, "includeDetail": true}}'
|
||
stock_result = self.stock_material(stock_query)
|
||
|
||
if isinstance(stock_result, str):
|
||
stock_data = json.loads(stock_result)
|
||
else:
|
||
stock_data = stock_result
|
||
|
||
materials = stock_data
|
||
for material in materials:
|
||
material_name = material.get("name")
|
||
material_id = material.get("id")
|
||
if material_name and material_id:
|
||
self.material_cache[material_name] = material_id
|
||
|
||
# 处理样品板等容器中的detail材料
|
||
detail_materials = material.get("detail", [])
|
||
for detail_material in detail_materials:
|
||
detail_name = detail_material.get("name")
|
||
detail_id = detail_material.get("detailMaterialId")
|
||
if detail_name and detail_id:
|
||
self.material_cache[detail_name] = detail_id
|
||
print(f"加载detail材料: {detail_name} -> ID: {detail_id}")
|
||
|
||
print(f"材料列表缓存加载完成,共加载 {len(self.material_cache)} 个材料")
|
||
|
||
except Exception as e:
|
||
print(f"加载材料列表缓存时出错: {e}")
|
||
self.material_cache = {}
|
||
|
||
def _get_material_id_by_name(self, material_name_or_id: str) -> str:
|
||
"""根据材料名称获取材料ID"""
|
||
if len(material_name_or_id) > 20 and '-' in material_name_or_id:
|
||
return material_name_or_id
|
||
|
||
if material_name_or_id in self.material_cache:
|
||
material_id = self.material_cache[material_name_or_id]
|
||
print(f"从缓存找到材料: {material_name_or_id} -> ID: {material_id}")
|
||
return material_id
|
||
|
||
print(f"警告: 未在缓存中找到材料名称 '{material_name_or_id}',将使用原值")
|
||
return material_name_or_id
|
||
|
||
def refresh_material_cache(self):
|
||
"""刷新材料列表缓存"""
|
||
print("正在刷新材料列表缓存...")
|
||
self._load_material_cache()
|
||
|
||
def get_available_materials(self):
|
||
"""获取所有可用的材料名称列表"""
|
||
return list(self.material_cache.keys()) |