feat: migrate to pymodbus 3.11.4 and update bioyond configs

PyModbus 3.x Migration:
- Copied modbus.py and client.py from dev branch for compatibility
- Rewrote FLOAT32 decoding using struct module in coin_cell_assembly.py
- Fixed STRING decoding for QR codes (battery and electrolyte barcodes)
- Tested successfully on hardware with correct data decoding

Bioyond Studio Updates:
- Updated bioyond_studio config.py
- Modified bioyond_cell_workstation.py
- Enhanced warehouse.py and decks.py
- Added README_WAREHOUSE.md documentation

Parameter Enhancements:
- Enhanced coin_cell_workstation.yaml parameter descriptions
- Added matrix position ranges and indexing rules

Breaking changes:
- Requires pymodbus >= 3.9.0
- Removed deprecated BinaryPayloadDecoder/BinaryPayloadBuilder
- Updated to use client.convert_from/to_registers() methods
This commit is contained in:
Andy6M
2026-01-10 17:01:40 +08:00
parent 936834f8c3
commit f355722281
29 changed files with 1460 additions and 8006 deletions

View File

@@ -161,6 +161,95 @@ class BioyondCellWorkstation(BioyondWorkstation):
logger.warning(f"任务未知状态 ({status}) (orderCode={order_code})")
return {"status": f"unknown_{status}", "report": report}
def get_material_info(self, material_id: str) -> Dict[str, Any]:
"""查询物料详细信息(物料详情接口)
Args:
material_id: 物料 ID (GUID)
Returns:
物料详情,包含 name, typeName, locations 等
"""
result = self._post_lims("/api/lims/storage/material-info", material_id)
return result.get("data", {})
def _process_order_reagents(self, report: Dict[str, Any]) -> Dict[str, Any]:
"""处理订单完成报文中的试剂数据,计算质量比
Args:
report: 订单完成推送的 report 数据
Returns:
{
"real_mass_ratio": {"试剂A": 0.6, "试剂B": 0.4},
"target_mass_ratio": {"试剂A": 0.6, "试剂B": 0.4},
"reagent_details": [...] # 详细数据
}
"""
used_materials = report.get("usedMaterials", [])
# 1. 筛选试剂typemode="2",注意是小写且是字符串)
reagents = [m for m in used_materials if str(m.get("typemode")) == "2"]
if not reagents:
logger.warning("订单完成报文中没有试剂typeMode=2")
return {
"real_mass_ratio": {},
"target_mass_ratio": {},
"reagent_details": []
}
# 2. 查询试剂名称
reagent_data = []
for reagent in reagents:
material_id = reagent.get("materialId")
if not material_id:
continue
try:
info = self.get_material_info(material_id)
name = info.get("name", f"Unknown_{material_id[:8]}")
real_qty = float(reagent.get("realQuantity", 0.0))
used_qty = float(reagent.get("usedQuantity", 0.0))
reagent_data.append({
"name": name,
"material_id": material_id,
"real_quantity": real_qty,
"used_quantity": used_qty
})
logger.info(f"试剂: {name}, 目标={used_qty}g, 实际={real_qty}g")
except Exception as e:
logger.error(f"查询物料信息失败: {material_id}, {e}")
continue
if not reagent_data:
return {
"real_mass_ratio": {},
"target_mass_ratio": {},
"reagent_details": []
}
# 3. 计算质量比
def calculate_mass_ratio(items: List[Dict], key: str) -> Dict[str, float]:
total = sum(item[key] for item in items)
if total == 0:
logger.warning(f"总质量为0无法计算{key}质量比")
return {item["name"]: 0.0 for item in items}
return {item["name"]: round(item[key] / total, 4) for item in items}
real_mass_ratio = calculate_mass_ratio(reagent_data, "real_quantity")
target_mass_ratio = calculate_mass_ratio(reagent_data, "used_quantity")
logger.info(f"真实质量比: {real_mass_ratio}")
logger.info(f"目标质量比: {target_mass_ratio}")
return {
"real_mass_ratio": real_mass_ratio,
"target_mass_ratio": target_mass_ratio,
"reagent_details": reagent_data
}
# -------------------- 基础HTTP封装 --------------------
def _url(self, path: str) -> str:
@@ -643,6 +732,21 @@ class BioyondCellWorkstation(BioyondWorkstation):
# 提取报文数据
if result.get("status") == "success":
report = result.get("report", {})
# [新增] 处理试剂数据,计算质量比
try:
mass_ratios = self._process_order_reagents(report)
report["mass_ratios"] = mass_ratios # 添加到报文中
logger.info(f"已计算订单 {order_code} 的试剂质量比")
except Exception as e:
logger.error(f"计算试剂质量比失败: {e}")
report["mass_ratios"] = {
"real_mass_ratio": {},
"target_mass_ratio": {},
"reagent_details": [],
"error": str(e)
}
all_reports.append(report)
print(f"[create_orders] ✓ 订单 {order_code} 完成")
else:
@@ -672,6 +776,252 @@ class BioyondCellWorkstation(BioyondWorkstation):
return final_result
def create_orders_v2(self, xlsx_path: str) -> Dict[str, Any]:
"""
从 Excel 解析并创建实验2.14- V2版本
约定:
- batchId = Excel 文件名(不含扩展名)
- 物料列:所有以 "(g)" 结尾(不再读取"总质量(g)"列)
- totalMass 自动计算为所有物料质量之和
- createTime 缺失或为空时自动填充为当前日期YYYY/M/D
"""
default_path = Path("D:\\UniLab\\Uni-Lab-OS\\unilabos\\devices\\workstation\\bioyond_studio\\bioyond_cell\\2025122301.xlsx")
path = Path(xlsx_path) if xlsx_path else default_path
print(f"[create_orders_v2] 使用 Excel 路径: {path}")
if path != default_path:
print("[create_orders_v2] 来源: 调用方传入自定义路径")
else:
print("[create_orders_v2] 来源: 使用默认模板路径")
if not path.exists():
print(f"[create_orders_v2] ⚠️ Excel 文件不存在: {path}")
raise FileNotFoundError(f"未找到 Excel 文件:{path}")
try:
df = pd.read_excel(path, sheet_name=0, engine="openpyxl")
except Exception as e:
raise RuntimeError(f"读取 Excel 失败:{e}")
print(f"[create_orders_v2] Excel 读取成功,行数: {len(df)}, 列: {list(df.columns)}")
# 列名容错:返回可选列名,找不到则返回 None
def _pick(col_names: List[str]) -> Optional[str]:
for c in col_names:
if c in df.columns:
return c
return None
col_order_name = _pick(["配方ID", "orderName", "订单编号"])
col_create_time = _pick(["创建日期", "createTime"])
col_bottle_type = _pick(["配液瓶类型", "bottleType"])
col_mix_time = _pick(["混匀时间(s)", "mixTime"])
col_load = _pick(["扣电组装分液体积", "loadSheddingInfo"])
col_pouch = _pick(["软包组装分液体积", "pouchCellInfo"])
col_cond = _pick(["电导测试分液体积", "conductivityInfo"])
col_cond_cnt = _pick(["电导测试分液瓶数", "conductivityBottleCount"])
print("[create_orders_v2] 列匹配结果:", {
"order_name": col_order_name,
"create_time": col_create_time,
"bottle_type": col_bottle_type,
"mix_time": col_mix_time,
"load": col_load,
"pouch": col_pouch,
"conductivity": col_cond,
"conductivity_bottle_count": col_cond_cnt,
})
# 物料列:所有以 (g) 结尾
material_cols = [c for c in df.columns if isinstance(c, str) and c.endswith("(g)")]
print(f"[create_orders_v2] 识别到的物料列: {material_cols}")
if not material_cols:
raise KeyError("未发现任何以“(g)”结尾的物料列,请检查表头。")
batch_id = path.stem
def _to_ymd_slash(v) -> str:
# 统一为 "YYYY/M/D";为空或解析失败则用当前日期
if v is None or (isinstance(v, float) and pd.isna(v)) or str(v).strip() == "":
ts = datetime.now()
else:
try:
ts = pd.to_datetime(v)
except Exception:
ts = datetime.now()
return f"{ts.year}/{ts.month}/{ts.day}"
def _as_int(val, default=0) -> int:
try:
if pd.isna(val):
return default
return int(val)
except Exception:
return default
def _as_float(val, default=0.0) -> float:
try:
if pd.isna(val):
return default
return float(val)
except Exception:
return default
def _as_str(val, default="") -> str:
if val is None or (isinstance(val, float) and pd.isna(val)):
return default
s = str(val).strip()
return s if s else default
orders: List[Dict[str, Any]] = []
for idx, row in df.iterrows():
mats: List[Dict[str, Any]] = []
total_mass = 0.0
for mcol in material_cols:
val = row.get(mcol, None)
if val is None or (isinstance(val, float) and pd.isna(val)):
continue
try:
mass = float(val)
except Exception:
continue
if mass > 0:
mats.append({"name": mcol.replace("(g)", ""), "mass": mass})
total_mass += mass
else:
if mass < 0:
print(f"[create_orders_v2] 第 {idx+1} 行物料 {mcol} 数值为负数: {mass}")
order_data = {
"batchId": batch_id,
"orderName": _as_str(row[col_order_name], default=f"{batch_id}_order_{idx+1}") if col_order_name else f"{batch_id}_order_{idx+1}",
"createTime": _to_ymd_slash(row[col_create_time]) if col_create_time else _to_ymd_slash(None),
"bottleType": _as_str(row[col_bottle_type], default="配液小瓶") if col_bottle_type else "配液小瓶",
"mixTime": _as_int(row[col_mix_time]) if col_mix_time else 0,
"loadSheddingInfo": _as_float(row[col_load]) if col_load else 0.0,
"pouchCellInfo": _as_float(row[col_pouch]) if col_pouch else 0,
"conductivityInfo": _as_float(row[col_cond]) if col_cond else 0,
"conductivityBottleCount": _as_int(row[col_cond_cnt]) if col_cond_cnt else 0,
"materialInfos": mats,
"totalMass": round(total_mass, 4) # 自动汇总
}
print(f"[create_orders_v2] 第 {idx+1} 行解析结果: orderName={order_data['orderName']}, "
f"loadShedding={order_data['loadSheddingInfo']}, pouchCell={order_data['pouchCellInfo']}, "
f"conductivity={order_data['conductivityInfo']}, totalMass={order_data['totalMass']}, "
f"material_count={len(mats)}")
if order_data["totalMass"] <= 0:
print(f"[create_orders_v2] ⚠️ 第 {idx+1} 行总质量 <= 0可能导致 LIMS 校验失败")
if not mats:
print(f"[create_orders_v2] ⚠️ 第 {idx+1} 行未找到有效物料")
orders.append(order_data)
print("================================================")
print("orders:", orders)
print(f"[create_orders_v2] 即将提交订单数量: {len(orders)}")
response = self._post_lims("/api/lims/order/orders", orders)
print(f"[create_orders_v2] 接口返回: {response}")
# 提取所有返回的 orderCode
data_list = response.get("data", [])
if not data_list:
logger.error("创建订单未返回有效数据!")
return response
# 收集所有 orderCode
order_codes = []
for order_item in data_list:
code = order_item.get("orderCode")
if code:
order_codes.append(code)
if not order_codes:
logger.error("未找到任何有效的 orderCode")
return response
print(f"[create_orders_v2] 等待 {len(order_codes)} 个订单完成: {order_codes}")
# ========== 步骤1: 等待所有订单完成并收集报文(不计算质量比)==========
all_reports = []
for idx, order_code in enumerate(order_codes, 1):
print(f"[create_orders_v2] 正在等待第 {idx}/{len(order_codes)} 个订单: {order_code}")
result = self.wait_for_order_finish(order_code)
# 提取报文数据
if result.get("status") == "success":
report = result.get("report", {})
all_reports.append(report)
print(f"[create_orders_v2] ✓ 订单 {order_code} 完成")
else:
logger.warning(f"订单 {order_code} 状态异常: {result.get('status')}")
# 即使订单失败,也记录下这个结果
all_reports.append({
"orderCode": order_code,
"status": result.get("status"),
"error": result.get("message", "未知错误")
})
print(f"[create_orders_v2] 所有订单已完成,共收集 {len(all_reports)} 个报文")
# ========== 步骤2: 统一计算所有订单的质量比 ==========
print(f"[create_orders_v2] 开始统一计算 {len(all_reports)} 个订单的质量比...")
all_mass_ratios = [] # 存储所有订单的质量比与reports顺序一致
for idx, report in enumerate(all_reports, 1):
order_code = report.get("orderCode", "N/A")
print(f"[create_orders_v2] 计算第 {idx}/{len(all_reports)} 个订单 {order_code} 的质量比...")
# 只为成功完成的订单计算质量比
if "error" not in report:
try:
mass_ratios = self._process_order_reagents(report)
# 精简输出,只保留核心质量比信息
all_mass_ratios.append({
"orderCode": order_code,
"orderName": report.get("orderName", "N/A"),
"real_mass_ratio": mass_ratios.get("real_mass_ratio", {}),
"target_mass_ratio": mass_ratios.get("target_mass_ratio", {})
})
logger.info(f"✓ 已计算订单 {order_code} 的试剂质量比")
except Exception as e:
logger.error(f"计算订单 {order_code} 质量比失败: {e}")
all_mass_ratios.append({
"orderCode": order_code,
"orderName": report.get("orderName", "N/A"),
"real_mass_ratio": {},
"target_mass_ratio": {},
"error": str(e)
})
else:
# 失败的订单不计算质量比
all_mass_ratios.append({
"orderCode": order_code,
"orderName": report.get("orderName", "N/A"),
"real_mass_ratio": {},
"target_mass_ratio": {},
"error": "订单未成功完成"
})
print(f"[create_orders_v2] 质量比计算完成")
print("实验记录本========================create_orders_v2========================")
# 返回所有订单的完成报文
final_result = {
"status": "all_completed",
"total_orders": len(order_codes),
"bottle_count": len(order_codes), # 明确标注瓶数用于下游check
"reports": all_reports, # 原始订单报文(不含质量比)
"mass_ratios": all_mass_ratios, # 所有质量比统一放在这里
"original_response": response
}
print(f"返回报文数量: {len(all_reports)}")
for i, report in enumerate(all_reports, 1):
print(f"报文 {i}: orderCode={report.get('orderCode', 'N/A')}, status={report.get('status', 'N/A')}")
print("========================")
return final_result
# 2.7 启动调度
def scheduler_start(self) -> Dict[str, Any]:
return self._post_lims("/api/lims/scheduler/start")
@@ -683,7 +1033,7 @@ class BioyondCellWorkstation(BioyondWorkstation):
请求体只包含 apiKey 和 requestTime
"""
return self._post_lims("/api/lims/scheduler/stop")
# 2.9 继续调度
# 2.9 继续调度
def scheduler_continue(self) -> Dict[str, Any]:
"""
@@ -867,6 +1217,48 @@ class BioyondCellWorkstation(BioyondWorkstation):
result = self.wait_for_order_finish(order_code)
return result
def transfer_3_to_2(self,
source_wh_id: Optional[str] = '3a19debc-84b4-0359-e2d4-b3beea49348b',
source_x: int = 1,
source_y: int = 1,
source_z: int = 1) -> Dict[str, Any]:
"""
2.34 3-2 物料转运接口
新建从 3 -> 2 的搬运任务
Args:
source_wh_id: 来源仓库 Id (默认为3号仓库)
source_x: 来源位置 X 坐标
source_y: 来源位置 Y 坐标
source_z: 来源位置 Z 坐标
Returns:
dict: 包含任务 orderId 和 orderCode 的响应
"""
payload: Dict[str, Any] = {
"sourcePosX": source_x,
"sourcePosY": source_y,
"sourcePosZ": source_z
}
if source_wh_id:
payload["sourceWHID"] = source_wh_id
logger.info(f"[transfer_3_to_2] 开始转运: 仓库={source_wh_id}, 位置=({source_x}, {source_y}, {source_z})")
response = self._post_lims("/api/lims/order/transfer-task3To2", payload)
# 等待任务报送成功
order_code = response.get("data", {}).get("orderCode")
if not order_code:
logger.error("[transfer_3_to_2] 转运任务未返回有效 orderCode")
return response
logger.info(f"[transfer_3_to_2] 转运任务已创建: {order_code}")
# 等待完成报送
result = self.wait_for_order_finish(order_code)
logger.info(f"[transfer_3_to_2] 转运任务完成: {order_code}")
return result
# 3.35 1→2 物料转运
def transfer_1_to_2(self) -> Dict[str, Any]:
"""
@@ -874,14 +1266,28 @@ class BioyondCellWorkstation(BioyondWorkstation):
URL: /api/lims/order/transfer-task1To2
只需要 apiKey 和 requestTime
"""
logger.info("[transfer_1_to_2] 开始 1→2 物料转运")
response = self._post_lims("/api/lims/order/transfer-task1To2")
# 等待任务报送成功
order_code = response.get("data", {}).get("orderCode")
logger.info(f"[transfer_1_to_2] API Response: {response}")
# 等待任务报送成功 - 处理不同的响应格式
order_code = None
data_field = response.get("data")
if isinstance(data_field, dict):
order_code = data_field.get("orderCode")
elif isinstance(data_field, str):
# 某些接口可能直接返回 orderCode 字符串
order_code = data_field
if not order_code:
logger.error("上料任务未返回有效 orderCode")
logger.error(f"[transfer_1_to_2] 转运任务未返回有效 orderCode响应: {response}")
return response
# 等待完成报送
logger.info(f"[transfer_1_to_2] 转运任务已创建: {order_code}")
# 等待完成报送
result = self.wait_for_order_finish(order_code)
logger.info(f"[transfer_1_to_2] 转运任务完成: {order_code}")
return result
# 2.5 批量查询实验报告(post过滤关键字查询)

View File

@@ -0,0 +1,12 @@
import pubchempy as pcp
cas = "21324-40-3" # 示例
comps = pcp.get_compounds(cas, namespace="name")
if not comps:
raise ValueError("No hit")
c = comps[0]
print("Canonical SMILES:", c.canonical_smiles)
print("Isomeric SMILES:", c.isomeric_smiles)
print("MW:", c.molecular_weight)

View File

@@ -133,6 +133,46 @@ WAREHOUSE_MAPPING = {
"J03": "3a19deae-2c7a-f237-89d9-8fe19025dee9"
}
},
"手动传递窗右": {
"uuid": "",
"site_uuids": {
"A01": "3a19deae-2c7a-36f5-5e41-02c5b66feaea",
"A02": "3a19deae-2c7a-dc6d-c41e-ef285d946cfe",
"A03": "3a19deae-2c7a-5876-c454-6b7e224ca927",
"B01": "3a19deae-2c7a-2426-6d71-e9de3cb158b1",
"B02": "3a19deae-2c7a-79b0-5e44-efaafd1e4cf3",
"B03": "3a19deae-2c7a-b9eb-f4e3-e308e0cf839a",
"C01": "3a19deae-2c7a-32bc-768e-556647e292f3",
"C02": "3a19deae-2c7a-e97a-8484-f5a4599447c4",
"C03": "3a19deae-2c7a-3056-6504-10dc73fbc276",
"D01": "3a19deae-2c7a-ffad-875e-8c4cda61d440",
"D02": "3a19deae-2c7a-61be-601c-b6fb5610499a",
"D03": "3a19deae-2c7a-c0f7-05a7-e3fe2491e560",
"E01": "3a19deae-2c7a-a6f4-edd1-b436a7576363",
"E02": "3a19deae-2c7a-4367-96dd-1ca2186f4910",
"E03": "3a19deae-2c7a-b163-2219-23df15200311",
}
},
"手动传递窗左": {
"uuid": "",
"site_uuids": {
"F01": "3a19deae-2c7a-d594-fd6a-0d20de3c7c4a",
"F02": "3a19deae-2c7a-a194-ea63-8b342b8d8679",
"F03": "3a19deae-2c7a-f7c4-12bd-425799425698",
"G01": "3a19deae-2c7a-0b56-72f1-8ab86e53b955",
"G02": "3a19deae-2c7a-204e-95ed-1f1950f28343",
"G03": "3a19deae-2c7a-392b-62f1-4907c66343f8",
"H01": "3a19deae-2c7a-5602-e876-d27aca4e3201",
"H02": "3a19deae-2c7a-f15c-70e0-25b58a8c9702",
"H03": "3a19deae-2c7a-780b-8965-2e1345f7e834",
"I01": "3a19deae-2c7a-8849-e172-07de14ede928",
"I02": "3a19deae-2c7a-4772-a37f-ff99270bafc0",
"I03": "3a19deae-2c7a-cce7-6e4a-25ea4a2068c4",
"J01": "3a19deae-2c7a-1848-de92-b5d5ed054cc6",
"J02": "3a19deae-2c7a-1d45-b4f8-6f866530e205",
"J03": "3a19deae-2c7a-f237-89d9-8fe19025dee9"
}
},
"4号手套箱内部堆栈": {
"uuid": "",
"site_uuids": {