Update bioyond_cell_workstation.py

This commit is contained in:
lixinyu1011
2025-10-20 15:30:41 +08:00
parent d2352cc514
commit 2f25063bf1

View File

@@ -14,7 +14,6 @@ from urllib3 import response
from unilabos.devices.workstation.workstation_base import WorkstationBase from unilabos.devices.workstation.workstation_base import WorkstationBase
from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService
from unilabos.utils.log import logger from unilabos.utils.log import logger
from pylabrobot.resources.deck import Deck
def _iso_local_now_ms() -> str: def _iso_local_now_ms() -> str:
# 文档要求:到毫秒 + Z例如 2025-08-15T05:43:22.814Z # 文档要求:到毫秒 + Z例如 2025-08-15T05:43:22.814Z
@@ -43,13 +42,15 @@ class BioyondCellWorkstation(WorkstationBase):
"api_key": "8A819E5C", "api_key": "8A819E5C",
"timeout": 30, "timeout": 30,
"report_token": "CHANGE_ME_TOKEN", "report_token": "CHANGE_ME_TOKEN",
"HTTP_host": "192.168.1.104", "HTTP_host": "172.21.32.90",
"HTTP_port": 8080, "HTTP_port": 8080,
"debug_mode": False "debug_mode": False
} # report_token unilab自己的令牌report_token0928未启用 } # report_token unilab自己的令牌report_token0928未启用
self.debug_mode = self.bioyond_config["debug_mode"] self.debug_mode = self.bioyond_config["debug_mode"]
self.http_service_started = False self.http_service_started = False
super().__init__(station_resource=station_resource, *args, **kwargs) deck = kwargs.pop("deck", None)
self.device_id = kwargs.pop("device_id", "bioyond_cell_workstation")
super().__init__(deck=deck, station_resource=station_resource, *args, **kwargs)
logger.info(f"Bioyond工作站初始化完成 (debug_mode={self.debug_mode})") logger.info(f"Bioyond工作站初始化完成 (debug_mode={self.debug_mode})")
# 实例化并在后台线程启动 HTTP 报送服务 # 实例化并在后台线程启动 HTTP 报送服务
@@ -132,8 +133,7 @@ class BioyondCellWorkstation(WorkstationBase):
def auto_feeding4to3( def auto_feeding4to3(
self, self,
# ★ 修改点:默认模板路径 # ★ 修改点:默认模板路径
xlsx_path: Optional[str] = "unilabos/devices/workstation/bioyond_cell/样品导入模板.xlsx", xlsx_path: Optional[str] = "unilabos/devices/workstation/bioyond_studio/bioyond_cell/样品导入模板.xlsx",
# ---------------- WH4 - 加样头面 (Z=1, 12个点位) ---------------- # ---------------- WH4 - 加样头面 (Z=1, 12个点位) ----------------
WH4_x1_y1_z1_1_materialName: str = "", WH4_x1_y1_z1_1_quantity: float = 0.0, WH4_x1_y1_z1_1_materialName: str = "", WH4_x1_y1_z1_1_quantity: float = 0.0,
WH4_x2_y1_z1_2_materialName: str = "", WH4_x2_y1_z1_2_quantity: float = 0.0, WH4_x2_y1_z1_2_materialName: str = "", WH4_x2_y1_z1_2_quantity: float = 0.0,
@@ -196,7 +196,7 @@ class BioyondCellWorkstation(WorkstationBase):
for _, row in df.iloc[1:13, 2:7].iterrows(): for _, row in df.iloc[1:13, 2:7].iterrows():
if pd.notna(row[5]): if pd.notna(row[5]):
items.append({ items.append({
"sourceWHName": "WH4", "sourceWHName": "四号手套箱堆栈",
"posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]), "posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]),
"materialName": str(row[5]).strip(), "materialName": str(row[5]).strip(),
"quantity": float(row[6]) if pd.notna(row[6]) else 0.0, "quantity": float(row[6]) if pd.notna(row[6]) else 0.0,
@@ -205,7 +205,7 @@ class BioyondCellWorkstation(WorkstationBase):
for _, row in df.iloc[14:23, 2:9].iterrows(): for _, row in df.iloc[14:23, 2:9].iterrows():
if pd.notna(row[5]): if pd.notna(row[5]):
items.append({ items.append({
"sourceWHName": "WH4", "sourceWHName": "四号手套箱堆栈",
"posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]), "posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]),
"materialName": str(row[5]).strip(), "materialName": str(row[5]).strip(),
"quantity": float(row[6]) if pd.notna(row[6]) else 0.0, "quantity": float(row[6]) if pd.notna(row[6]) else 0.0,
@@ -216,7 +216,7 @@ class BioyondCellWorkstation(WorkstationBase):
for _, row in df.iloc[25:40, 2:7].iterrows(): for _, row in df.iloc[25:40, 2:7].iterrows():
if pd.notna(row[5]) or pd.notna(row[6]): if pd.notna(row[5]) or pd.notna(row[6]):
items.append({ items.append({
"sourceWHName": "WH3", "sourceWHName": "三号手套箱人工堆栈",
"posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]), "posX": int(row[2]), "posY": int(row[3]), "posZ": int(row[4]),
"materialType": str(row[5]).strip() if pd.notna(row[5]) else "", "materialType": str(row[5]).strip() if pd.notna(row[5]) else "",
"materialId": str(row[6]).strip() if pd.notna(row[6]) else "", "materialId": str(row[6]).strip() if pd.notna(row[6]) else "",
@@ -229,28 +229,28 @@ class BioyondCellWorkstation(WorkstationBase):
if not items: if not items:
params = locals() params = locals()
for name, value in params.items(): for name, value in params.items():
if name.startswith("WH4") and "materialName" in name and value: if name.startswith("四号手套箱堆栈") and "materialName" in name and value:
idx = name.split("_") idx = name.split("_")
items.append({ items.append({
"sourceWHName": "WH4", "sourceWHName": "四号手套箱堆栈",
"posX": int(idx[1][1:]), "posY": int(idx[2][1:]), "posZ": int(idx[3][1:]), "posX": int(idx[1][1:]), "posY": int(idx[2][1:]), "posZ": int(idx[3][1:]),
"materialName": value, "materialName": value,
"quantity": float(params.get(name.replace("materialName", "quantity"), 0.0)) "quantity": float(params.get(name.replace("materialName", "quantity"), 0.0))
}) })
elif name.startswith("WH4") and "materialType" in name and (value or params.get(name.replace("materialType", "materialName"), "")): elif name.startswith("四号手套箱堆栈") and "materialType" in name and (value or params.get(name.replace("materialType", "materialName"), "")):
idx = name.split("_") idx = name.split("_")
items.append({ items.append({
"sourceWHName": "WH4", "sourceWHName": "四号手套箱堆栈",
"posX": int(idx[1][1:]), "posY": int(idx[2][1:]), "posZ": int(idx[3][1:]), "posX": int(idx[1][1:]), "posY": int(idx[2][1:]), "posZ": int(idx[3][1:]),
"materialName": params.get(name.replace("materialType", "materialName"), ""), "materialName": params.get(name.replace("materialType", "materialName"), ""),
"quantity": float(params.get(name.replace("materialType", "quantity"), 0.0)), "quantity": float(params.get(name.replace("materialType", "quantity"), 0.0)),
"materialType": value, "materialType": value,
"targetWH": params.get(name.replace("materialType", "targetWH"), ""), "targetWH": params.get(name.replace("materialType", "targetWH"), ""),
}) })
elif name.startswith("WH3") and "materialType" in name and (value or params.get(name.replace("materialType", "materialId"), "")): elif name.startswith("三号手套箱人工堆栈") and "materialType" in name and (value or params.get(name.replace("materialType", "materialId"), "")):
idx = name.split("_") idx = name.split("_")
items.append({ items.append({
"sourceWHName": "WH3", "sourceWHName": "三号手套箱人工堆栈",
"posX": int(idx[1][1:]), "posY": int(idx[2][1:]), "posZ": int(idx[3][1:]), "posX": int(idx[1][1:]), "posY": int(idx[2][1:]), "posZ": int(idx[3][1:]),
"materialType": value, "materialType": value,
"materialId": params.get(name.replace("materialType", "materialId"), ""), "materialId": params.get(name.replace("materialType", "materialId"), ""),
@@ -260,7 +260,7 @@ class BioyondCellWorkstation(WorkstationBase):
if not items: if not items:
logger.warning("没有有效的上料条目,已跳过提交。") logger.warning("没有有效的上料条目,已跳过提交。")
return {"code": 0, "message": "no valid items", "data": []} return {"code": 0, "message": "no valid items", "data": []}
logger.info(items)
return self._post_lims("/api/lims/order/auto-feeding4to3", items) return self._post_lims("/api/lims/order/auto-feeding4to3", items)
@@ -641,9 +641,13 @@ class BioyondCellWorkstation(WorkstationBase):
# -------------------------------- # --------------------------------
if __name__ == "__main__": if __name__ == "__main__":
ws = BioyondCellWorkstation() ws = BioyondCellWorkstation()
ws.scheduler_stop() # logger.info(ws.scheduler_start())
# re=ws.scheduler_start()
print(re) logger.info(ws.auto_feeding4to3())
# re=ws.scheduler_stop()
# re = ws.transfer_3_to_2_to_1()
# print(re)
# logger.info("调度启动完成") # logger.info("调度启动完成")
# ws.scheduler_continue() # ws.scheduler_continue()