新增批量创建固体物料和从CSV文件入库的功能;更新配置文件中的 report_ip 默认值;新增 solid_materials.csv 文件以支持物料名称导入。

This commit is contained in:
calvincao
2025-10-23 17:32:08 +08:00
parent 35fc2f5ea6
commit 1af6ffafc6
3 changed files with 243 additions and 2 deletions

View File

@@ -825,12 +825,221 @@ class BioyondCellWorkstation(BioyondWorkstation):
logger.warning("超时未找到成功的物料转移任务")
return False
def create_solid_materials(self, material_names: List[str], type_id: str = "3a190ca0-b2f6-9aeb-8067-547e72c11469") -> List[Dict[str, Any]]:
"""
批量创建固体物料
Args:
material_names: 物料名称列表
type_id: 物料类型ID默认为固体物料类型
Returns:
创建的物料列表每个元素包含物料信息和ID
"""
created_materials = []
total = len(material_names)
for i, name in enumerate(material_names, 1):
# 根据接口文档构建完整的请求体
material_data = {
"typeId": type_id,
"name": name,
"unit": "g", # 添加单位
"quantity": 1, # 添加数量默认1
"parameters": "" # 参数字段(空字符串表示无参数)
}
logger.info(f"正在创建第 {i}/{total} 个固体物料: {name}")
result = self._post_lims("/api/lims/storage/material", material_data)
if result and result.get("code") == 1:
# data 字段可能是字符串物料ID或字典包含id字段
data = result.get("data")
if isinstance(data, str):
# data 直接是物料ID字符串
material_id = data
elif isinstance(data, dict):
# data 是字典包含id字段
material_id = data.get("id")
else:
material_id = None
if material_id:
created_materials.append({
"name": name,
"materialId": material_id,
"typeId": type_id
})
logger.info(f"✓ 成功创建物料: {name}, ID: {material_id}")
else:
logger.error(f"✗ 创建物料失败: {name}, 未返回ID")
logger.error(f" 响应数据: {result}")
else:
error_msg = result.get("error") or result.get("message", "未知错误")
logger.error(f"✗ 创建物料失败: {name}")
logger.error(f" 错误信息: {error_msg}")
logger.error(f" 完整响应: {result}")
# 避免请求过快
time.sleep(0.3)
logger.info(f"物料创建完成,成功创建 {len(created_materials)}/{total} 个固体物料")
return created_materials
def create_and_inbound_materials_from_csv(
self,
csv_path: str = "solid_materials.csv",
type_id: str = "3a190ca0-b2f6-9aeb-8067-547e72c11469",
warehouse_name: str = "粉末加样头堆栈"
) -> Dict[str, Any]:
"""
从CSV文件读取物料列表创建物料并批量入库到指定堆栈
Args:
csv_path: CSV文件路径
type_id: 物料类型ID默认为固体物料类型
warehouse_name: 仓库名称(默认为"粉末加样头堆栈"
Returns:
包含执行结果的字典
"""
logger.info("=" * 60)
logger.info(f"开始执行从CSV读取物料列表并批量创建入库到 {warehouse_name}")
logger.info("=" * 60)
# 从配置中获取位置ID列表
warehouse_mapping = self.bioyond_config.get("warehouse_mapping", WAREHOUSE_MAPPING)
if warehouse_name not in warehouse_mapping:
error_msg = f"配置中未找到仓库: {warehouse_name}"
logger.error(error_msg)
logger.info(f"可用的仓库: {list(warehouse_mapping.keys())}")
return {"success": False, "error": error_msg}
warehouse_config = warehouse_mapping[warehouse_name]
site_uuids = warehouse_config.get("site_uuids", {})
if not site_uuids:
error_msg = f"仓库 {warehouse_name} 没有配置位置"
logger.error(error_msg)
return {"success": False, "error": error_msg}
# 按顺序获取位置IDA01, B01, C01...
all_location_ids = []
position_names = []
for key in sorted(site_uuids.keys()):
all_location_ids.append(site_uuids[key])
position_names.append(key)
logger.info(f"✓ 从配置文件加载 {len(all_location_ids)} 个位置")
logger.info(f" 仓库: {warehouse_name}")
logger.info(f" 位置范围: {position_names[0]} ~ {position_names[-1]}")
# 读取CSV文件
csv_file_path = Path(csv_path)
material_names = []
try:
df_materials = pd.read_csv(csv_file_path)
if 'material_name' in df_materials.columns:
material_names = df_materials['material_name'].dropna().astype(str).str.strip().tolist()
logger.info(f"✓ 成功从CSV文件读取 {len(material_names)} 个物料名称")
logger.info(f" 文件路径: {csv_file_path}")
else:
logger.error(f"✗ CSV文件缺少 'material_name'")
return {"success": False, "error": "CSV文件缺少 'material_name'"}
except FileNotFoundError:
logger.error(f"✗ 未找到CSV文件: {csv_file_path}")
logger.info("请创建CSV文件格式")
logger.info(" material_name")
logger.info(" LiPF6")
logger.info(" LiDFOB")
logger.info(" ...")
return {"success": False, "error": f"未找到CSV文件: {csv_file_path}"}
except Exception as e:
logger.error(f"✗ 读取CSV文件失败: {e}")
return {"success": False, "error": f"读取CSV文件失败: {e}"}
if not material_names:
logger.error("CSV文件中没有有效的物料名称")
return {"success": False, "error": "CSV文件中没有有效的物料名称"}
# 检查物料数量
if len(material_names) > len(all_location_ids):
logger.warning(f"物料数量({len(material_names)})超过可用位置数量({len(all_location_ids)})")
logger.warning(f"将仅创建前 {len(all_location_ids)} 个物料")
material_names = material_names[:len(all_location_ids)]
# 准备位置信息
location_ids = all_location_ids[:len(material_names)]
selected_positions = position_names[:len(material_names)]
# 步骤1: 创建固体物料
logger.info(f"\n【步骤1/2】创建 {len(material_names)} 个固体物料...")
logger.info(f"物料类型ID: {type_id}")
logger.info(f"物料列表: {', '.join(material_names)}")
created_materials = self.create_solid_materials(
material_names=material_names,
type_id=type_id
)
if len(created_materials) != len(material_names):
logger.warning(f"创建的物料数量({len(created_materials)})与计划数量({len(material_names)})不匹配!")
logger.warning("将仅对成功创建的物料进行入库操作")
if not created_materials:
logger.error("没有成功创建任何物料")
return {"success": False, "error": "没有成功创建任何物料"}
# 步骤2: 批量入库到指定位置
logger.info(f"\n【步骤2/2】批量入库物料到 {warehouse_name}...")
inbound_items = []
for idx, material in enumerate(created_materials):
if idx < len(location_ids):
inbound_items.append({
"materialId": material["materialId"],
"locationId": location_ids[idx]
})
logger.info(f" - {material['name']} (ID: {material['materialId'][:8]}...) → 位置 {selected_positions[idx]}")
logger.info(f"\n正在执行批量入库,共 {len(inbound_items)} 条记录...")
result = self.storage_batch_inbound(inbound_items)
if result.get("code") == 1:
logger.info(f"✓ 批量入库成功!")
logger.info(f" 响应数据: {result.get('data', {})}")
else:
logger.error(f"✗ 批量入库失败!")
logger.error(f" 响应: {result}")
logger.info("\n" + "=" * 60)
logger.info("固体物料创建和入库流程完成")
logger.info("=" * 60 + "\n")
return {
"success": result.get("code") == 1,
"created_materials": created_materials,
"inbound_result": result,
"total_created": len(created_materials),
"total_inbound": len(inbound_items),
"warehouse": warehouse_name,
"positions": selected_positions
}
# --------------------------------
if __name__ == "__main__":
ws = BioyondCellWorkstation()
logger.info(ws.scheduler_start())
#TODO新建入库
# 从CSV文件读取物料列表并批量创建入库
result = ws.create_and_inbound_materials_from_csv()
# 继续后续流程
logger.info(ws.auto_feeding4to3()) #搬运物料到3号箱
# 使用正斜杠或 Path 对象来指定文件路径
excel_path = Path("unilabos/devices/workstation/bioyond_studio/bioyond_cell/2025092701.xlsx")

View File

@@ -0,0 +1,7 @@
material_name
LiPF6
LiDFOB
DTD
LiFSI
LiPO2F2
1 material_name
2 LiPF6
3 LiDFOB
4 DTD
5 LiFSI
6 LiPO2F2

View File

@@ -25,7 +25,7 @@ BIOYOND_FULL_CONFIG = {
# HTTP 服务配置
"HTTP_host": os.getenv("BIOYOND_HTTP_HOST", "0.0.0.0"), # HTTP服务监听地址0.0.0.0 表示监听所有网络接口)
"HTTP_port": int(os.getenv("BIOYOND_HTTP_PORT", "8080")),
"report_ip": os.getenv("BIOYOND_REPORT_IP", "172.21.32.57"), # 报送给 Bioyond 的本机IP地址留空则自动检测
"report_ip": os.getenv("BIOYOND_REPORT_IP", "172.21.32.172"), # 报送给 Bioyond 的本机IP地址留空则自动检测
# 调试模式
"debug_mode": os.getenv("BIOYOND_DEBUG_MODE", "False").lower() == "true",
@@ -112,6 +112,31 @@ WAREHOUSE_MAPPING = {
"B3": "3a14198c-c2d0-725e-523d-34c037ac2440",
"B4": "3a14198c-c2d0-efce-0939-69ca5a7dfd39"
}
},
"粉末加样头堆栈": {
"uuid": "",
"site_uuids": {
"A01": "3a19da56-1379-20c8-5886-f7c4fbcb5733",
"B01": "3a19da56-1379-2424-d751-fe6e94cef938",
"C01": "3a19da56-1379-271c-03e3-6bdb590e395e",
"D01": "3a19da56-1379-277f-2b1b-0d11f7cf92c6",
"E01": "3a19da56-1379-2f1c-a15b-e01db90eb39a",
"F01": "3a19da56-1379-3fa1-846b-088158ac0b3d",
"G01": "3a19da56-1379-5aeb-d0cd-d3b4609d66e1",
"H01": "3a19da56-1379-6077-8258-bdc036870b78",
"I01": "3a19da56-1379-863b-a120-f606baf04617",
"J01": "3a19da56-1379-8a74-74e5-35a9b41d4fd5",
"K01": "3a19da56-1379-b270-b7af-f18773918abe",
"L01": "3a19da56-1379-ba54-6d78-fd770a671ffc",
"M01": "3a19da56-1379-c22d-c96f-0ceb5eb54a04",
"N01": "3a19da56-1379-d64e-c6c5-c72ea4829888",
"O01": "3a19da56-1379-d887-1a3c-6f9cce90f90e",
"P01": "3a19da56-1379-e77d-0e65-7463b238a3b9",
"Q01": "3a19da56-1379-edf6-1472-802ddb628774",
"R01": "3a19da56-1379-f281-0273-e0ef78f0fd97",
"S01": "3a19da56-1379-f924-7f68-df1fa51489f4",
"T01": "3a19da56-1379-ff7c-1745-07e200b44ce2"
}
}
}