Files
Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/bioyond_rpc.py
ZiWei a625a86e3e HR物料同步,前端展示位置修复 (#135)
* 更新Bioyond工作站配置,添加新的物料类型映射和载架定义,优化物料查询逻辑

* 添加Bioyond实验配置文件,定义物料类型映射和设备配置

* 更新bioyond_warehouse_reagent_stack方法,修正试剂堆栈尺寸和布局描述

* 更新Bioyond实验配置,修正物料类型映射,优化设备配置

* 更新Bioyond资源同步逻辑,优化物料入库流程,增强错误处理和日志记录

* 更新Bioyond资源,添加配液站和反应站专用载架,优化仓库工厂函数的排序方式

* 更新Bioyond资源,添加配液站和反应站相关载架,优化试剂瓶和样品瓶配置

* 更新Bioyond实验配置,修正试剂瓶载架ID,确保与设备匹配

* 更新Bioyond资源,移除反应站单烧杯载架,添加反应站单烧瓶载架分类

* Refactor Bioyond resource synchronization and update bottle carrier definitions

- Removed traceback printing in error handling for Bioyond synchronization.
- Enhanced logging for existing Bioyond material ID usage during synchronization.
- Added new bottle carrier definitions for single flask and updated existing ones.
- Refactored dispensing station and reaction station bottle definitions for clarity and consistency.
- Improved resource mapping and error handling in graphio for Bioyond resource conversion.
- Introduced layout parameter in warehouse factory for better warehouse configuration.

* 更新Bioyond仓库工厂,添加排序方式支持,优化坐标计算逻辑

* 更新Bioyond载架和甲板配置,调整样品板尺寸和仓库坐标

* 更新Bioyond资源同步,增强占用位置日志信息,修正坐标转换逻辑

* 更新Bioyond反应站和分配站配置,调整材料类型映射和ID,移除不必要的项

* support name change during materials change

* fix json dumps

* correct tip

* 优化调度器API路径,更新相关方法描述

* 更新 BIOYOND 载架相关文档,调整 API 以支持自带试剂瓶的载架类型,修复资源获取时的子物料处理逻辑

* 实现资源删除时的同步处理,优化出库操作逻辑

* 修复 ItemizedCarrier 中的可见性逻辑

* 保存 Bioyond 原始信息到 unilabos_extra,以便出库时查询

* 根据 resource.capacity 判断是试剂瓶(载架)还是多瓶载架,走不同的奔曜转换

* Fix bioyond bottle_carriers ordering

* 优化 Bioyond 物料同步逻辑,增强坐标解析和位置更新处理

* disable slave connect websocket

* correct remove_resource stats

* change uuid logger to trace level

* enable slave mode

* refactor(bioyond): 统一资源命名并优化物料同步逻辑

- 将DispensingStation和ReactionStation资源统一为PolymerStation命名
- 优化物料同步逻辑,支持耗材类型(typeMode=0)的查询
- 添加物料默认参数配置功能
- 调整仓库坐标布局
- 清理废弃资源定义

* feat(warehouses): 为仓库函数添加col_offset和layout参数

* refactor: 更新实验配置中的物料类型映射命名

将DispensingStation和ReactionStation的物料类型映射统一更名为PolymerStation,保持命名一致性

* fix: 更新实验配置中的载体名称从6VialCarrier到6StockCarrier

* feat(bioyond): 实现物料创建与入库分离逻辑

将物料同步流程拆分为两个独立阶段:transfer阶段只创建物料,add阶段执行入库
简化状态检查接口,仅返回连接状态

* fix(reaction_station): 修正液体进料烧杯体积单位并增强返回结果

将液体进料烧杯的体积单位从μL改为g以匹配实际使用场景
在返回结果中添加merged_workflow和order_params字段,提供更完整的工作流信息

* feat(dispensing_station): 在任务创建返回结果中添加order_params信息

在create_order方法返回结果中增加order_params字段,以便调用方获取完整的任务参数

* fix(dispensing_station): 修改90%物料分配逻辑从分成3份改为直接使用

原逻辑将主称固体平均分成3份作为90%物料,现改为直接使用main_portion

* feat(bioyond): 添加任务编码和任务ID的输出,支持批量任务创建后的状态监控

* refactor(registry): 简化设备配置中的任务结果处理逻辑

将多个单独的任务编码和ID字段合并为统一的return_info字段
更新相关描述以反映新的数据结构

* feat(工作站): 添加HTTP报送服务和任务完成状态跟踪

- 在graphio.py中添加API必需字段
- 实现工作站HTTP服务启动和停止逻辑
- 添加任务完成状态跟踪字典和等待方法
- 重写任务完成报送处理方法记录状态
- 支持批量任务完成等待和报告获取

* refactor(dispensing_station): 移除wait_for_order_completion_and_get_report功能

该功能已被wait_for_multiple_orders_and_get_reports替代,简化代码结构

* fix: 更新任务报告API错误

* fix(workstation_http_service): 修复状态查询中device_id获取逻辑

处理状态查询时安全获取device_id,避免因属性不存在导致的异常

* fix(bioyond_studio): 改进物料入库失败时的错误处理和日志记录

在物料入库API调用失败时,添加更详细的错误信息打印
同时修正station.py中对空响应和失败情况的判断逻辑

* refactor(bioyond): 优化瓶架载体的分配逻辑和注释说明

重构瓶架载体的分配逻辑,使用嵌套循环替代硬编码索引分配
添加更详细的坐标映射说明,明确PLR与Bioyond坐标的对应关系

* fix(bioyond_rpc): 修复物料入库成功时无data字段返回空的问题

当API返回成功但无data字段时,返回包含success标识的字典而非空字典

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>
Co-authored-by: Junhan Chang <changjh@dp.tech>
2025-11-15 03:11:34 +08:00

799 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# bioyond_rpc.py
"""
BioyondV1RPC类定义 - 负责HTTP接口通信和通用函数
仅包含基础的API调用、通用工具函数不包含特定站点业务逻辑
"""
from enum import Enum
from datetime import datetime, timezone
from unilabos.device_comms.rpc import BaseRequest
from typing import Optional, List, Dict, Any
import json
from unilabos.devices.workstation.bioyond_studio.config import LOCATION_MAPPING
class SimpleLogger:
"""简单的日志记录器"""
def info(self, msg): print(f"[INFO] {msg}")
def error(self, msg): print(f"[ERROR] {msg}")
def debug(self, msg): print(f"[DEBUG] {msg}")
def warning(self, msg): print(f"[WARNING] {msg}")
def critical(self, msg): print(f"[CRITICAL] {msg}")
class MachineState(Enum):
INITIAL = 0
STOPPED = 1
RUNNING = 2
PAUSED = 3
ERROR_PAUSED = 4
ERROR_STOPPED = 5
class MaterialType(Enum):
Consumables = 0
Sample = 1
Reagent = 2
Product = 3
class BioyondException(Exception):
"""Bioyond操作异常"""
pass
class BioyondV1RPC(BaseRequest):
def __init__(self, config):
super().__init__()
print("开始初始化 BioyondV1RPC")
self.config = config
self.api_key = config["api_key"]
self.host = config["api_host"]
self._logger = SimpleLogger()
self.material_cache = {}
self._load_material_cache()
# ==================== 基础通用方法 ====================
def get_current_time_iso8601(self) -> str:
"""
获取当前时间,并格式化为 ISO 8601 格式(包含毫秒部分)。
:return: 当前时间的 ISO 8601 格式字符串
"""
current_time = datetime.now(timezone.utc).isoformat(
timespec='milliseconds'
)
# 替换时区部分为 'Z'
current_time = current_time.replace("+00:00", "Z")
return current_time
def get_logger(self):
return self._logger
# ==================== 物料查询相关接口 ====================
def stock_material(self, json_str: str) -> list:
"""
描述:返回所有当前在库的,已启用的物料
json_str 字段介绍格式为JSON字符串:
'{"typeMode": 0, "filter": "样品", "includeDetail": true}'
typeMode: 物料类型, 样品1、试剂2、耗材0
filter: 过滤字段, 物料名称/物料编码
includeDetail: 是否包含所在库位。truefalse
"""
try:
params = json.loads(json_str)
except json.JSONDecodeError:
return []
response = self.post(
url=f'{self.host}/api/lims/storage/stock-material',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
return []
return response.get("data", [])
def query_warehouse_by_material_type(self, type_id: str) -> dict:
"""
描述:查询物料类型可以入库的库位
type_id: 物料类型ID
"""
params = {
"typeId": type_id
}
response = self.post(
url=f'{self.host}/api/lims/storage/warehouse-info-by-mat-type-id',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response:
return {}
if response['code'] != 1:
print(
f"query warehouse by material type error: {response.get('message', '')}"
)
return {}
return response.get("data", {})
def material_id_query(self, json_str: str) -> dict:
"""
查询物料id
json_str 格式为JSON字符串:
'{"material123"}'
"""
params = json_str
response = self.post(
url=f'{self.host}/api/lims/storage/workflow-sample-locations',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response:
return {}
if response['code'] != 1:
print(f"material_id_query error: {response.get('message')}")
return {}
print(f"material_id_query data: {response['data']}")
return response.get("data", {})
def add_material(self, params: dict) -> dict:
"""
描述:添加新的物料
json_str 格式为JSON字符串
"""
response = self.post(
url=f'{self.host}/api/lims/storage/material',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response:
return {}
if response['code'] != 1:
print(f"add material error: {response.get('message', '')}")
return {}
print(f"add material data: {response['data']}")
return response.get("data", {})
def query_matial_type_id(self, data) -> list:
"""查找物料typeid"""
response = self.post(
url=f'{self.host}/api/lims/storage/material-types',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": data
})
if not response or response['code'] != 1:
return []
return str(response.get("data", {}))
def material_inbound(self, material_id: str, location_id: str) -> dict:
"""
描述:指定库位入库一个物料
material_id: 物料ID
location_name: 库位名称会自动映射到location_id
"""
params = {
"materialId": material_id,
"locationId": location_id
}
response = self.post(
url=f'{self.host}/api/lims/storage/inbound',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
if response:
error_msg = response.get('message', '未知错误')
print(f"[ERROR] 物料入库失败: code={response.get('code')}, message={error_msg}")
else:
print(f"[ERROR] 物料入库失败: API 无响应")
return {}
# 入库成功时,即使没有 data 字段,也返回成功标识
return response.get("data") or {"success": True}
def delete_material(self, material_id: str) -> dict:
"""
描述:删除尚未入库的物料
material_id: 物料ID
"""
response = self.post(
url=f'{self.host}/api/lims/storage/delete-material',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": material_id
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def material_outbound(self, material_id: str, location_name: str, quantity: int) -> dict:
"""指定库位出库物料(通过库位名称)"""
location_id = LOCATION_MAPPING.get(location_name, location_name)
params = {
"materialId": material_id,
"locationId": location_id,
"quantity": quantity
}
response = self.post(
url=f'{self.host}/api/lims/storage/outbound',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
return None
return response
def material_outbound_by_id(self, material_id: str, location_id: str, quantity: int) -> dict:
"""指定库位出库物料直接使用location_id
Args:
material_id: 物料ID
location_id: 库位ID不是库位名称是UUID
quantity: 数量
Returns:
dict: API响应失败返回None
"""
params = {
"materialId": material_id,
"locationId": location_id,
"quantity": quantity
}
response = self.post(
url=f'{self.host}/api/lims/storage/outbound',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
return None
return response
# ==================== 工作流查询相关接口 ====================
def query_workflow(self, json_str: str) -> dict:
try:
params = json.loads(json_str)
except json.JSONDecodeError:
print(f"无效的JSON字符串: {json_str}")
return {}
except Exception as e:
print(f"处理JSON时出错: {str(e)}")
return {}
response = self.post(
url=f'{self.host}/api/lims/workflow/work-flow-list',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def workflow_step_query(self, workflow_id: str) -> dict:
"""
描述:查询某一个子工作流的详细信息,包含所有步骤、参数信息
json_str 格式为JSON字符串:
'{"workflow_id": "workflow123"}'
"""
response = self.post(
url=f'{self.host}/api/lims/workflow/sub-workflow-step-parameters',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": workflow_id,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def validate_workflow_parameters(self, workflows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""验证工作流参数格式"""
try:
validation_errors = []
for i, workflow in enumerate(workflows):
workflow_errors = []
# 检查基本结构
if not isinstance(workflow, dict):
workflow_errors.append("工作流必须是字典类型")
continue
if "id" not in workflow:
workflow_errors.append("缺少必要的 'id' 字段")
# 检查 stepParameters如果存在
if "stepParameters" in workflow:
step_params = workflow["stepParameters"]
if not isinstance(step_params, dict):
workflow_errors.append("stepParameters 必须是字典类型")
else:
# 验证参数结构
for step_id, modules in step_params.items():
if not isinstance(modules, dict):
workflow_errors.append(f"步骤 {step_id} 的模块配置必须是字典类型")
continue
for module_name, params in modules.items():
if not isinstance(params, list):
workflow_errors.append(f"步骤 {step_id} 模块 {module_name} 的参数必须是列表类型")
continue
for j, param in enumerate(params):
if not isinstance(param, dict):
workflow_errors.append(f"步骤 {step_id} 模块 {module_name} 参数 {j} 必须是字典类型")
elif "Key" not in param or "DisplayValue" not in param:
workflow_errors.append(f"步骤 {step_id} 模块 {module_name} 参数 {j} 必须包含 Key 和 DisplayValue")
if workflow_errors:
validation_errors.append({
"workflow_index": i,
"workflow_id": workflow.get("id", "unknown"),
"errors": workflow_errors
})
if validation_errors:
return {
"valid": False,
"errors": validation_errors,
"message": f"发现 {len(validation_errors)} 个工作流存在验证错误"
}
else:
return {
"valid": True,
"message": f"所有 {len(workflows)} 个工作流验证通过"
}
except Exception as e:
return {
"valid": False,
"errors": [{"general_error": str(e)}],
"message": f"验证过程中发生异常: {str(e)}"
}
def get_workflow_parameter_template(self) -> Dict[str, Any]:
"""获取工作流参数模板"""
return {
"template": {
"name": "拼接后的长工作流的名称",
"workflows": [
{
"id": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"stepParameters": {
"步骤ID (UUID)": {
"模块名称": [
{
"key": "参数键名",
"value": "参数值或变量引用 {{index-m-n}}"
}
]
}
}
}
]
},
"parameter_descriptions": {
"name": "拼接后的长工作流名称",
"workflows": "待合并的子工作流列表",
"id": "子工作流 ID对应工作流列表中 workflows 数组中每个对象的 id 字段",
"stepParameters": "步骤参数配置,如果子工作流没有参数则不需要填写"
}
}
# ==================== 任务订单相关接口 ====================
def create_order(self, json_str: str) -> dict:
"""
描述:新建并开始任务,返回需要的物料和入库的库位
json_str 格式为JSON字符串包含任务参数
"""
try:
params = json.loads(json_str)
self._logger.info(f"创建任务参数: {params}")
self._logger.info(f"参数类型: {type(params)}")
response = self.post(
url=f'{self.host}/api/lims/order/order',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response:
raise BioyondException("API调用失败未收到响应")
if response['code'] != 1:
error_msg = f"创建任务失败: {response.get('message', '未知错误')}"
self._logger.error(error_msg)
raise BioyondException(error_msg)
self._logger.info(f"创建任务成功,返回数据: {response['data']}")
result = str(response.get("data", {}))
return result
except BioyondException:
# 重新抛出BioyondException
raise
except json.JSONDecodeError as e:
error_msg = f"JSON解析失败: {str(e)}"
self._logger.error(error_msg)
raise BioyondException(error_msg) from e
except Exception as e:
# 捕获其他未预期的异常转换为BioyondException
error_msg = f"创建任务时发生未预期的错误: {str(e)}"
self._logger.error(error_msg)
raise BioyondException(error_msg) from e
def order_query(self, json_str: str) -> dict:
"""
描述:查询任务列表
json_str 格式为JSON字符串
"""
try:
params = json.loads(json_str)
except json.JSONDecodeError:
return {}
response = self.post(
url=f'{self.host}/api/lims/order/order-list',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def order_report(self, json_str: str) -> dict:
"""
描述:查询某个任务明细
json_str 格式为JSON字符串:
'{"order_id": "order123"}'
"""
try:
data = json.loads(json_str)
order_id = data.get("order_id", "")
except json.JSONDecodeError:
return {}
response = self.post(
url=f'{self.host}/api/lims/order/project-order-report',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def order_takeout(self, json_str: str) -> int:
"""
描述:取出任务产物
json_str 格式为JSON字符串:
'{"order_id": "order123", "preintake_id": "preintake123"}'
"""
try:
data = json.loads(json_str)
params = {
"orderId": data.get("order_id", ""),
"preintakeId": data.get("preintake_id", "")
}
except json.JSONDecodeError:
return 0
response = self.post(
url=f'{self.host}/api/lims/order/order-takeout',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def sample_waste_removal(self, order_id: str) -> dict:
"""
样品/废料取出接口
参数:
- order_id: 订单ID
返回: 取出结果
"""
params = {"orderId": order_id}
response = self.post(
url=f'{self.host}/api/lims/order/take-out',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params
})
if not response:
return {}
if response['code'] != 1:
self._logger.error(f"样品废料取出错误: {response.get('message', '')}")
return {}
return response.get("data", {})
def cancel_order(self, json_str: str) -> bool:
"""
描述:取消指定任务
json_str 格式为JSON字符串:
'{"order_id": "order123"}'
"""
try:
data = json.loads(json_str)
order_id = data.get("order_id", "")
except json.JSONDecodeError:
return False
response = self.post(
url=f'{self.host}/api/lims/order/cancel-order',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return False
return True
# ==================== 设备管理相关接口 ====================
def device_list(self, json_str: str = "") -> list:
"""
描述:获取所有设备列表
json_str 格式为JSON字符串可选
"""
device_no = None
if json_str:
try:
data = json.loads(json_str)
device_no = data.get("device_no", None)
except json.JSONDecodeError:
pass
url = f'{self.host}/api/lims/device/device-list'
if device_no:
url += f'/{device_no}'
response = self.post(
url=url,
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return []
return response.get("data", [])
def device_operation(self, json_str: str) -> int:
"""
描述:操作设备
json_str 格式为JSON字符串
"""
try:
data = json.loads(json_str)
params = {
"deviceNo": data.get("device_no", ""),
"operationType": data.get("operation_type", 0),
"operationParams": data.get("operation_params", {})
}
except json.JSONDecodeError:
return 0
response = self.post(
url=f'{self.host}/api/lims/device/device-operation',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
# ==================== 调度器相关接口 ====================
def scheduler_status(self) -> dict:
response = self.post(
url=f'{self.host}/api/lims/scheduler/scheduler-status',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def scheduler_start(self) -> int:
"""描述:启动调度器"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/start',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def scheduler_pause(self) -> int:
"""描述:暂停调度器"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/pause',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def scheduler_continue(self) -> int:
"""描述:继续调度器"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/continue',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def scheduler_stop(self) -> int:
"""描述:停止调度器"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/stop',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def scheduler_reset(self) -> int:
"""描述:复位调度器"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/reset',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
# ==================== 辅助方法 ====================
def _load_material_cache(self):
"""预加载材料列表到缓存中"""
try:
print("正在加载材料列表缓存...")
# 加载所有类型的材料:耗材(0)、样品(1)、试剂(2)
material_types = [0, 1, 2]
for type_mode in material_types:
print(f"正在加载类型 {type_mode} 的材料...")
stock_query = f'{{"typeMode": {type_mode}, "includeDetail": true}}'
stock_result = self.stock_material(stock_query)
if isinstance(stock_result, str):
stock_data = json.loads(stock_result)
else:
stock_data = stock_result
materials = stock_data
for material in materials:
material_name = material.get("name")
material_id = material.get("id")
if material_name and material_id:
self.material_cache[material_name] = material_id
# 处理样品板等容器中的detail材料
detail_materials = material.get("detail", [])
for detail_material in detail_materials:
detail_name = detail_material.get("name")
detail_id = detail_material.get("detailMaterialId")
if detail_name and detail_id:
self.material_cache[detail_name] = detail_id
print(f"加载detail材料: {detail_name} -> ID: {detail_id}")
print(f"材料列表缓存加载完成,共加载 {len(self.material_cache)} 个材料")
except Exception as e:
print(f"加载材料列表缓存时出错: {e}")
self.material_cache = {}
def _get_material_id_by_name(self, material_name_or_id: str) -> str:
"""根据材料名称获取材料ID"""
if len(material_name_or_id) > 20 and '-' in material_name_or_id:
return material_name_or_id
if material_name_or_id in self.material_cache:
material_id = self.material_cache[material_name_or_id]
print(f"从缓存找到材料: {material_name_or_id} -> ID: {material_id}")
return material_id
print(f"警告: 未在缓存中找到材料名称 '{material_name_or_id}',将使用原值")
return material_name_or_id
def refresh_material_cache(self):
"""刷新材料列表缓存"""
print("正在刷新材料列表缓存...")
self._load_material_cache()
def get_available_materials(self):
"""获取所有可用的材料名称列表"""
return list(self.material_cache.keys())