refactor(bioyond_cell_workstation): 重构物料创建与入库逻辑- 移除从CSV读取物料名称的功能

- 新增通过参数传递物料名称列表的方式- 抽离仓库位置加载逻辑至独立方法
- 简化物料创建与入库流程- 统一使用资源同步器进行数据同步
- 更新调用示例以适配新接口
This commit is contained in:
calvincao
2025-10-23 22:36:21 +08:00
parent f0ebcc60bb
commit e92d933968

View File

@@ -14,7 +14,7 @@ import socket
from urllib3 import response
from unilabos.devices.workstation.workstation_base import WorkstationBase
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation, BioyondResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.config import (
BIOYOND_FULL_CONFIG, WORKFLOW_MAPPINGS, MATERIAL_TYPE_MAPPINGS, WAREHOUSE_MAPPING
)
@@ -886,180 +886,132 @@ class BioyondCellWorkstation(BioyondWorkstation):
logger.info(f"物料创建完成,成功创建 {len(created_materials)}/{total} 个固体物料")
return created_materials
def create_and_inbound_materials_from_csv(
self,
csv_path: str = "solid_materials.csv",
def _sync_materials_safe(self) -> bool:
"""仅使用 BioyondResourceSynchronizer 执行同步(与 station.py 保持一致)。"""
if hasattr(self, 'resource_synchronizer') and self.resource_synchronizer:
try:
return bool(self.resource_synchronizer.sync_from_external())
except Exception as e:
logger.error(f"同步失败: {e}")
return False
logger.warning("资源同步器未初始化")
return False
def _load_warehouse_locations(self, warehouse_name: str) -> tuple[List[str], List[str]]:
"""从配置加载仓库位置信息
Args:
warehouse_name: 仓库名称
Returns:
(location_ids, position_names) 元组
"""
warehouse_mapping = self.bioyond_config.get("warehouse_mapping", WAREHOUSE_MAPPING)
if warehouse_name not in warehouse_mapping:
raise ValueError(f"配置中未找到仓库: {warehouse_name}。可用: {list(warehouse_mapping.keys())}")
site_uuids = warehouse_mapping[warehouse_name].get("site_uuids", {})
if not site_uuids:
raise ValueError(f"仓库 {warehouse_name} 没有配置位置")
# 按顺序获取位置ID和名称
location_ids = []
position_names = []
for key in sorted(site_uuids.keys()):
location_ids.append(site_uuids[key])
position_names.append(key)
return location_ids, position_names
def create_and_inbound_materials(
self,
material_names: Optional[List[str]] = None,
type_id: str = "3a190ca0-b2f6-9aeb-8067-547e72c11469",
warehouse_name: str = "粉末加样头堆栈"
) -> Dict[str, Any]:
"""
从CSV文件读取物料列表创建物料并批量入库到指定堆栈
传参与默认列表方式创建物料并入库不使用CSV
Args:
csv_path: CSV文件路径
type_id: 物料类型ID(默认为固体物料类型)
warehouse_name: 仓库名称(默认为"粉末加样头堆栈"
material_names: 物料名称列表;默认使用 [LiPF6, LiDFOB, DTD, LiFSI, LiPO2F2]
type_id: 物料类型ID
warehouse_name: 目标仓库名(用于取位置信息
Returns:
包含执行结果字典
执行结果字典
"""
logger.info("=" * 60)
logger.info(f"开始执行:从CSV读取物料列表并批量创建入库到 {warehouse_name}")
logger.info(f"开始执行:从参数创建物料并批量入库到 {warehouse_name}")
logger.info("=" * 60)
# 从配置中获取位置ID列表
warehouse_mapping = self.bioyond_config.get("warehouse_mapping", WAREHOUSE_MAPPING)
if warehouse_name not in warehouse_mapping:
error_msg = f"配置中未找到仓库: {warehouse_name}"
logger.error(error_msg)
logger.info(f"可用的仓库: {list(warehouse_mapping.keys())}")
return {"success": False, "error": error_msg}
warehouse_config = warehouse_mapping[warehouse_name]
site_uuids = warehouse_config.get("site_uuids", {})
if not site_uuids:
error_msg = f"仓库 {warehouse_name} 没有配置位置"
logger.error(error_msg)
return {"success": False, "error": error_msg}
# 按顺序获取位置IDA01, B01, C01...
all_location_ids = []
position_names = []
for key in sorted(site_uuids.keys()):
all_location_ids.append(site_uuids[key])
position_names.append(key)
logger.info(f"✓ 从配置文件加载 {len(all_location_ids)} 个位置")
logger.info(f" 仓库: {warehouse_name}")
logger.info(f" 位置范围: {position_names[0]} ~ {position_names[-1]}")
# 读取CSV文件
csv_file_path = Path(csv_path)
material_names = []
try:
df_materials = pd.read_csv(csv_file_path)
if 'material_name' in df_materials.columns:
material_names = df_materials['material_name'].dropna().astype(str).str.strip().tolist()
logger.info(f"✓ 成功从CSV文件读取 {len(material_names)} 个物料名称")
logger.info(f" 文件路径: {csv_file_path}")
# 1) 准备物料名称(默认值)
default_materials = ["LiPF6", "LiDFOB", "DTD", "LiFSI", "LiPO2F2"]
mat_names = [m.strip() for m in (material_names or default_materials) if str(m).strip()]
if not mat_names:
return {"success": False, "error": "物料名称列表为空"}
# 2) 加载仓库位置信息
all_location_ids, position_names = self._load_warehouse_locations(warehouse_name)
logger.info(f"✓ 加载 {len(all_location_ids)} 个位置 ({position_names[0]} ~ {position_names[-1]})")
# 限制数量不超过可用位置
if len(mat_names) > len(all_location_ids):
logger.warning(f"物料数量超出位置数量,仅处理前 {len(all_location_ids)}")
mat_names = mat_names[:len(all_location_ids)]
# 3) 创建物料
logger.info(f"\n【步骤1/3】创建 {len(mat_names)} 个固体物料...")
created_materials = self.create_solid_materials(mat_names, type_id)
if not created_materials:
return {"success": False, "error": "没有成功创建任何物料"}
# 4) 批量入库
logger.info(f"\n【步骤2/3】批量入库物料...")
location_ids = all_location_ids[:len(created_materials)]
selected_positions = position_names[:len(created_materials)]
inbound_items = [
{"materialId": mat["materialId"], "locationId": loc_id}
for mat, loc_id in zip(created_materials, location_ids)
]
for material, position in zip(created_materials, selected_positions):
logger.info(f" - {material['name']}{position}")
result = self.storage_batch_inbound(inbound_items)
if result.get("code") != 1:
logger.error(f"✗ 批量入库失败: {result}")
return {"success": False, "error": "批量入库失败", "created_materials": created_materials, "inbound_result": result}
logger.info("✓ 批量入库成功")
# 5) 同步
logger.info(f"\n【步骤3/3】同步物料数据...")
if self._sync_materials_safe():
logger.info("✓ 物料数据同步完成")
else:
logger.error(f"✗ CSV文件缺少 'material_name'")
return {"success": False, "error": "CSV文件缺少 'material_name'"}
except FileNotFoundError:
logger.error(f"✗ 未找到CSV文件: {csv_file_path}")
logger.info("请创建CSV文件格式")
logger.info(" material_name")
logger.info(" LiPF6")
logger.info(" LiDFOB")
logger.info(" ...")
return {"success": False, "error": f"未找到CSV文件: {csv_file_path}"}
logger.warning("⚠ 物料数据同步未完成(可忽略,不影响已创建与入库的数据)")
logger.info("\n" + "=" * 60)
logger.info("流程完成")
logger.info("=" * 60 + "\n")
return {
"success": True,
"created_materials": created_materials,
"inbound_result": result,
"total_created": len(created_materials),
"total_inbound": len(inbound_items),
"warehouse": warehouse_name,
"positions": selected_positions
}
except Exception as e:
logger.error(f"读取CSV文件失败: {e}")
return {"success": False, "error": f"读取CSV文件失败: {e}"}
if not material_names:
logger.error("CSV文件中没有有效的物料名称")
return {"success": False, "error": "CSV文件中没有有效的物料名称"}
# 检查物料数量
if len(material_names) > len(all_location_ids):
logger.warning(f"物料数量({len(material_names)})超过可用位置数量({len(all_location_ids)})")
logger.warning(f"将仅创建前 {len(all_location_ids)} 个物料")
material_names = material_names[:len(all_location_ids)]
# 准备位置信息
location_ids = all_location_ids[:len(material_names)]
selected_positions = position_names[:len(material_names)]
# 步骤1: 创建固体物料
logger.info(f"\n【步骤1/2】创建 {len(material_names)} 个固体物料...")
logger.info(f"物料类型ID: {type_id}")
logger.info(f"物料列表: {', '.join(material_names)}")
created_materials = self.create_solid_materials(
material_names=material_names,
type_id=type_id
)
if len(created_materials) != len(material_names):
logger.warning(f"创建的物料数量({len(created_materials)})与计划数量({len(material_names)})不匹配!")
logger.warning("将仅对成功创建的物料进行入库操作")
if not created_materials:
logger.error("没有成功创建任何物料")
return {"success": False, "error": "没有成功创建任何物料"}
# 步骤2: 批量入库到指定位置
logger.info(f"\n【步骤2/2】批量入库物料到 {warehouse_name}...")
inbound_items = []
for idx, material in enumerate(created_materials):
if idx < len(location_ids):
inbound_items.append({
"materialId": material["materialId"],
"locationId": location_ids[idx]
})
logger.info(f" - {material['name']} (ID: {material['materialId'][:8]}...) → 位置 {selected_positions[idx]}")
logger.info(f"\n正在执行批量入库,共 {len(inbound_items)} 条记录...")
result = self.storage_batch_inbound(inbound_items)
inbound_success = result.get("code") == 1
if inbound_success:
logger.info(f"✓ 批量入库成功!")
logger.info(f" 响应数据: {result.get('data', {})}")
# 步骤3: 同步物料数据
logger.info(f"\n【步骤3/3】同步物料数据到系统...")
if hasattr(self, 'resource_synchronizer') and self.resource_synchronizer:
try:
# 尝试同步不同类型的物料
# typeMode: 0=耗材, 1=样品, 2=试剂
sync_success = False
for type_mode in [0, 1, 2]:
try:
logger.info(f" 尝试同步 typeMode={type_mode} 的物料...")
bioyond_data = self.hardware_interface.stock_material(
f'{{"typeMode": {type_mode}, "includeDetail": true}}'
)
if bioyond_data:
logger.info(f" ✓ 获取到 {len(bioyond_data) if isinstance(bioyond_data, list) else 1} 条物料数据")
sync_success = True
except Exception as e:
logger.debug(f" typeMode={type_mode} 同步失败: {e}")
continue
if sync_success:
logger.info(f"✓ 物料数据同步完成")
else:
logger.warning(f"⚠ 物料数据同步未获取到数据(这是正常的,新创建的物料可能需要时间才能查询到)")
except Exception as e:
logger.warning(f"⚠ 物料数据同步出错: {e}")
logger.info(f" 提示:新创建的物料已成功入库,同步失败不影响使用")
else:
logger.warning("⚠ 资源同步器未初始化,跳过同步")
else:
logger.error(f"✗ 批量入库失败!")
logger.error(f" 响应: {result}")
logger.info("\n" + "=" * 60)
logger.info("固体物料创建和入库流程完成")
logger.info("=" * 60 + "\n")
return {
"success": inbound_success,
"created_materials": created_materials,
"inbound_result": result,
"total_created": len(created_materials),
"total_inbound": len(inbound_items),
"warehouse": warehouse_name,
"positions": selected_positions
}
logger.error(f"执行失败: {e}")
return {"success": False, "error": str(e)}
# --------------------------------
@@ -1070,7 +1022,7 @@ if __name__ == "__main__":
logger.info(ws.scheduler_start())
# 从CSV文件读取物料列表并批量创建入库
result = ws.create_and_inbound_materials_from_csv()
result = ws.create_and_inbound_materials()
# 继续后续流程
logger.info(ws.auto_feeding4to3()) #搬运物料到3号箱