mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-15 13:44:39 +00:00
1445 lines
65 KiB
Python
1445 lines
65 KiB
Python
"""
|
||
Bioyond工作站实现
|
||
Bioyond Workstation Implementation
|
||
|
||
集成Bioyond物料管理的工作站示例
|
||
"""
|
||
import time
|
||
import traceback
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional, Union
|
||
import json
|
||
from pathlib import Path
|
||
|
||
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
|
||
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
|
||
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
|
||
from unilabos.resources.warehouse import WareHouse
|
||
from unilabos.utils.log import logger
|
||
from unilabos.resources.graphio import resource_bioyond_to_plr, resource_plr_to_bioyond
|
||
|
||
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode
|
||
from unilabos.ros.nodes.presets.workstation import ROS2WorkstationNode
|
||
from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String
|
||
from pylabrobot.resources.resource import Resource as ResourcePLR
|
||
|
||
from unilabos.devices.workstation.bioyond_studio.config import (
|
||
API_CONFIG, WORKFLOW_MAPPINGS, MATERIAL_TYPE_MAPPINGS, WAREHOUSE_MAPPING, HTTP_SERVICE_CONFIG
|
||
)
|
||
from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService
|
||
|
||
|
||
class BioyondResourceSynchronizer(ResourceSynchronizer):
|
||
"""Bioyond资源同步器
|
||
|
||
负责与Bioyond系统进行物料数据的同步
|
||
"""
|
||
|
||
def __init__(self, workstation: 'BioyondWorkstation'):
|
||
super().__init__(workstation)
|
||
self.bioyond_api_client = None
|
||
self.sync_interval = 60 # 默认60秒同步一次
|
||
self.last_sync_time = 0
|
||
self.initialize()
|
||
|
||
def initialize(self) -> bool:
|
||
"""初始化Bioyond资源同步器"""
|
||
try:
|
||
self.bioyond_api_client = self.workstation.hardware_interface
|
||
if self.bioyond_api_client is None:
|
||
logger.error("Bioyond API客户端未初始化")
|
||
return False
|
||
|
||
# 设置同步间隔
|
||
self.sync_interval = self.workstation.bioyond_config.get("sync_interval", 600)
|
||
|
||
logger.info("Bioyond资源同步器初始化完成")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"Bioyond资源同步器初始化失败: {e}")
|
||
return False
|
||
|
||
def sync_from_external(self) -> bool:
|
||
"""从Bioyond系统同步物料数据"""
|
||
try:
|
||
if self.bioyond_api_client is None:
|
||
logger.error("Bioyond API客户端未初始化")
|
||
return False
|
||
|
||
# 同时查询耗材类型(typeMode=0)、样品类型(typeMode=1)和试剂类型(typeMode=2)
|
||
all_bioyond_data = []
|
||
|
||
# 查询耗材类型物料(例如:枪头盒)
|
||
bioyond_data_type0 = self.bioyond_api_client.stock_material('{"typeMode": 0, "includeDetail": true}')
|
||
if bioyond_data_type0:
|
||
all_bioyond_data.extend(bioyond_data_type0)
|
||
logger.debug(f"从Bioyond查询到 {len(bioyond_data_type0)} 个耗材类型物料")
|
||
|
||
# 查询样品类型物料(烧杯、试剂瓶、分装板等)
|
||
bioyond_data_type1 = self.bioyond_api_client.stock_material('{"typeMode": 1, "includeDetail": true}')
|
||
if bioyond_data_type1:
|
||
all_bioyond_data.extend(bioyond_data_type1)
|
||
logger.debug(f"从Bioyond查询到 {len(bioyond_data_type1)} 个样品类型物料")
|
||
|
||
# 查询试剂类型物料(样品板、样品瓶等)
|
||
bioyond_data_type2 = self.bioyond_api_client.stock_material('{"typeMode": 2, "includeDetail": true}')
|
||
if bioyond_data_type2:
|
||
all_bioyond_data.extend(bioyond_data_type2)
|
||
logger.debug(f"从Bioyond查询到 {len(bioyond_data_type2)} 个试剂类型物料")
|
||
|
||
if not all_bioyond_data:
|
||
logger.warning("从Bioyond获取的物料数据为空")
|
||
return False
|
||
|
||
# 转换为UniLab格式
|
||
unilab_resources = resource_bioyond_to_plr(
|
||
all_bioyond_data,
|
||
type_mapping=self.workstation.bioyond_config["material_type_mappings"],
|
||
deck=self.workstation.deck
|
||
)
|
||
|
||
logger.info(f"从Bioyond同步了 {len(unilab_resources)} 个资源")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"从Bioyond同步物料数据失败: {e}")
|
||
return False
|
||
|
||
def sync_to_external(self, resource: Any) -> bool:
|
||
"""将本地物料数据变更同步到Bioyond系统"""
|
||
try:
|
||
# ✅ 跳过仓库类型的资源 - 仓库是容器,不是物料
|
||
resource_category = getattr(resource, "category", None)
|
||
if resource_category == "warehouse":
|
||
logger.debug(f"[同步→Bioyond] 跳过仓库类型资源: {resource.name} (仓库是容器,不需要同步为物料)")
|
||
return True
|
||
|
||
logger.info(f"[同步→Bioyond] 收到物料变更: {resource.name}")
|
||
|
||
# 获取物料的 Bioyond ID
|
||
extra_info = getattr(resource, "unilabos_extra", {})
|
||
material_bioyond_id = extra_info.get("material_bioyond_id")
|
||
|
||
# 🔥 查询所有物料,用于获取物料当前位置等信息
|
||
existing_materials = []
|
||
try:
|
||
import json
|
||
logger.info(f"[同步→Bioyond] 查询 Bioyond 系统中的所有物料...")
|
||
all_materials = []
|
||
|
||
for type_mode in [0, 1, 2]: # 0=耗材, 1=样品, 2=试剂
|
||
query_params = json.dumps({
|
||
"typeMode": type_mode,
|
||
"filter": "",
|
||
"includeDetail": True
|
||
})
|
||
materials = self.bioyond_api_client.stock_material(query_params)
|
||
if materials:
|
||
all_materials.extend(materials)
|
||
|
||
existing_materials = all_materials
|
||
logger.info(f"[同步→Bioyond] 查询到 {len(all_materials)} 个物料")
|
||
except Exception as e:
|
||
logger.error(f"查询 Bioyond 物料失败: {e}")
|
||
return False
|
||
|
||
# ⭐ 如果没有 Bioyond ID,尝试从查询结果中按名称匹配
|
||
if not material_bioyond_id:
|
||
logger.warning(f"[同步→Bioyond] 物料 {resource.name} 没有 Bioyond ID,尝试按名称查询...")
|
||
for mat in existing_materials:
|
||
if mat.get("name") == resource.name:
|
||
material_bioyond_id = mat.get("id")
|
||
mat_type = mat.get("typeName", "未知")
|
||
logger.info(f"✅ 找到物料 {resource.name} ({mat_type}) 的 Bioyond ID: {material_bioyond_id[:8]}...")
|
||
# 保存 ID 到资源对象
|
||
extra_info["material_bioyond_id"] = material_bioyond_id
|
||
setattr(resource, "unilabos_extra", extra_info)
|
||
break
|
||
|
||
if not material_bioyond_id:
|
||
logger.warning(f"⚠️ 在 Bioyond 系统中未找到名为 {resource.name} 的物料")
|
||
logger.info(f"[同步→Bioyond] 这是一个新物料,将创建并入库到 Bioyond 系统")
|
||
|
||
# 检查是否有位置更新请求
|
||
update_site = extra_info.get("update_resource_site")
|
||
|
||
if not update_site:
|
||
logger.debug(f"[同步→Bioyond] 物料 {resource.name} 无位置更新请求,跳过同步")
|
||
return True
|
||
|
||
# ===== 物料移动/创建流程 =====
|
||
logger.info(f"[同步→Bioyond] 📍 物料 {resource.name} 目标库位: {update_site}")
|
||
|
||
if material_bioyond_id:
|
||
logger.info(f"[同步→Bioyond] 🔄 物料已存在于 Bioyond (ID: {material_bioyond_id[:8]}...),执行移动操作")
|
||
else:
|
||
logger.info(f"[同步→Bioyond] ➕ 物料不存在于 Bioyond,将创建新物料并入库")
|
||
|
||
# 第1步:获取仓库配置
|
||
from .config import WAREHOUSE_MAPPING
|
||
warehouse_mapping = WAREHOUSE_MAPPING
|
||
|
||
# 确定目标仓库名称
|
||
parent_name = None
|
||
target_location_uuid = None
|
||
current_warehouse = None
|
||
|
||
# 🔥 优先级1: 从 Bioyond 查询结果中获取物料当前所在的仓库
|
||
if material_bioyond_id:
|
||
for mat in existing_materials:
|
||
if mat.get("name") == resource.name or mat.get("id") == material_bioyond_id:
|
||
locations = mat.get("locations", [])
|
||
if locations and len(locations) > 0:
|
||
current_warehouse = locations[0].get("whName")
|
||
logger.info(f"[同步→Bioyond] 💡 物料当前位于 Bioyond 仓库: {current_warehouse}")
|
||
break
|
||
|
||
# 优先在当前仓库中查找目标库位
|
||
if current_warehouse and current_warehouse in warehouse_mapping:
|
||
site_uuids = warehouse_mapping[current_warehouse].get("site_uuids", {})
|
||
if update_site in site_uuids:
|
||
parent_name = current_warehouse
|
||
target_location_uuid = site_uuids[update_site]
|
||
logger.info(f"[同步→Bioyond] ✅ 在当前仓库找到目标库位: {parent_name}/{update_site}")
|
||
logger.info(f"[同步→Bioyond] 目标库位UUID: {target_location_uuid[:8]}...")
|
||
else:
|
||
logger.warning(f"⚠️ [同步→Bioyond] 当前仓库 {current_warehouse} 中没有库位 {update_site},将搜索其他仓库")
|
||
|
||
# 🔥 优先级2: 检查 PLR 父节点名称
|
||
if not parent_name or not target_location_uuid:
|
||
if resource.parent is not None:
|
||
parent_name_candidate = resource.parent.name
|
||
logger.info(f"[同步→Bioyond] 从 PLR 父节点获取仓库名称: {parent_name_candidate}")
|
||
|
||
if parent_name_candidate in warehouse_mapping:
|
||
site_uuids = warehouse_mapping[parent_name_candidate].get("site_uuids", {})
|
||
if update_site in site_uuids:
|
||
parent_name = parent_name_candidate
|
||
target_location_uuid = site_uuids[update_site]
|
||
logger.info(f"[同步→Bioyond] ✅ 在父节点仓库找到目标库位: {parent_name}/{update_site}")
|
||
logger.info(f"[同步→Bioyond] 目标库位UUID: {target_location_uuid[:8]}...")
|
||
|
||
# 🔥 优先级3: 遍历所有仓库查找(兜底方案)
|
||
if not parent_name or not target_location_uuid:
|
||
logger.info(f"[同步→Bioyond] 从所有仓库中查找库位 {update_site}...")
|
||
for warehouse_name, warehouse_info in warehouse_mapping.items():
|
||
site_uuids = warehouse_info.get("site_uuids", {})
|
||
if update_site in site_uuids:
|
||
parent_name = warehouse_name
|
||
target_location_uuid = site_uuids[update_site]
|
||
logger.warning(f"[同步→Bioyond] ⚠️ 在其他仓库找到目标库位: {parent_name}/{update_site}")
|
||
logger.info(f"[同步→Bioyond] 目标库位UUID: {target_location_uuid[:8]}...")
|
||
break
|
||
|
||
if not parent_name or not target_location_uuid:
|
||
logger.error(f"❌ [同步→Bioyond] 库位 {update_site} 没有在 WAREHOUSE_MAPPING 中配置")
|
||
logger.debug(f"[同步→Bioyond] 可用仓库: {list(warehouse_mapping.keys())}")
|
||
return False
|
||
|
||
# 第2步:转换为 Bioyond 格式
|
||
logger.info(f"[同步→Bioyond] 🔄 转换物料为 Bioyond 格式...")
|
||
|
||
# 导入物料默认参数配置
|
||
from .config import MATERIAL_DEFAULT_PARAMETERS
|
||
|
||
bioyond_material = resource_plr_to_bioyond(
|
||
[resource],
|
||
type_mapping=self.workstation.bioyond_config["material_type_mappings"],
|
||
warehouse_mapping=self.workstation.bioyond_config["warehouse_mapping"],
|
||
material_params=MATERIAL_DEFAULT_PARAMETERS
|
||
)[0]
|
||
|
||
logger.info(f"[同步→Bioyond] 🔧 准备覆盖locations字段,目标仓库: {parent_name}, 库位: {update_site}, UUID: {target_location_uuid[:8]}...")
|
||
|
||
# 🔥 强制覆盖 locations 信息,使用正确的目标库位 UUID
|
||
# resource_plr_to_bioyond 可能会生成错误的仓库信息,这里直接覆盖
|
||
bioyond_material["locations"] = [{
|
||
"id": target_location_uuid,
|
||
"whid": "",
|
||
"whName": parent_name,
|
||
"x": ord(update_site[0]) - ord('A') + 1, # A→1, B→2, ...
|
||
"y": int(update_site[1:]), # 01→1, 02→2, ...
|
||
"z": 1,
|
||
"quantity": 0
|
||
}]
|
||
logger.info(f"[同步→Bioyond] ✅ 已覆盖库位信息: {parent_name}/{update_site} (UUID: {target_location_uuid[:8]}...)")
|
||
|
||
logger.debug(f"[同步→Bioyond] Bioyond 物料数据: {bioyond_material}")
|
||
|
||
location_info = bioyond_material.get("locations")
|
||
logger.debug(f"[同步→Bioyond] 库位信息: {location_info}, 类型: {type(location_info)}")
|
||
|
||
# 第3步:根据是否已有 Bioyond ID 决定创建还是使用现有物料
|
||
if material_bioyond_id:
|
||
# 物料已存在,直接使用现有 ID
|
||
material_id = material_bioyond_id
|
||
logger.info(f"✅ [同步→Bioyond] 使用已有物料 ID: {material_id[:8]}...")
|
||
else:
|
||
# 物料不存在,调用 API 创建新物料
|
||
logger.info(f"[同步→Bioyond] 📤 调用 Bioyond API 添加物料...")
|
||
material_id = self.bioyond_api_client.add_material(bioyond_material)
|
||
|
||
if not material_id:
|
||
logger.error(f"❌ [同步→Bioyond] 添加物料失败,API 返回空")
|
||
return False
|
||
|
||
logger.info(f"✅ [同步→Bioyond] 物料添加成功,Bioyond ID: {material_id[:8]}...")
|
||
|
||
# 保存新创建的物料 ID 到资源对象
|
||
extra_info["material_bioyond_id"] = material_id
|
||
setattr(resource, "unilabos_extra", extra_info)
|
||
|
||
# 第4步:物料入库前先检查目标库位是否被占用
|
||
if location_info:
|
||
logger.info(f"[同步→Bioyond] 📥 准备入库到库位 {update_site}...")
|
||
|
||
# 处理不同的 location_info 数据结构
|
||
if isinstance(location_info, list) and len(location_info) > 0:
|
||
location_id = location_info[0]["id"]
|
||
elif isinstance(location_info, dict):
|
||
location_id = location_info["id"]
|
||
else:
|
||
logger.warning(f"⚠️ [同步→Bioyond] 无效的库位信息格式: {location_info}")
|
||
location_id = None
|
||
|
||
if location_id:
|
||
# 查询目标库位是否已有物料
|
||
logger.info(f"[同步→Bioyond] 🔍 检查库位 {update_site} (UUID: {location_id[:8]}...) 是否被占用...")
|
||
|
||
# 查询所有物料,检查是否有物料在目标库位
|
||
try:
|
||
all_materials_type1 = self.bioyond_api_client.stock_material('{"typeMode": 1, "includeDetail": true}')
|
||
all_materials_type2 = self.bioyond_api_client.stock_material('{"typeMode": 2, "includeDetail": true}')
|
||
all_materials = (all_materials_type1 or []) + (all_materials_type2 or [])
|
||
|
||
# 检查是否有物料已经在目标库位
|
||
location_occupied = False
|
||
occupying_material = None
|
||
|
||
# 同时检查当前物料是否在其他位置(需要先出库)
|
||
current_material_location = None
|
||
current_location_uuid = None
|
||
|
||
for material in all_materials:
|
||
locations = material.get("locations", [])
|
||
|
||
# 检查目标库位占用情况
|
||
for loc in locations:
|
||
if loc.get("id") == location_id:
|
||
location_occupied = True
|
||
occupying_material = material
|
||
logger.warning(f"⚠️ [同步→Bioyond] 库位 {update_site} 已被占用!")
|
||
logger.warning(f" 占用物料: {material.get('name')} (ID: {material.get('id', '')[:8]}...)")
|
||
logger.warning(f" 占用位置: code={loc.get('code')}, x={loc.get('x')}, y={loc.get('y')}")
|
||
logger.warning(f" 🔍 详细信息: location_id={loc.get('id')[:8]}..., 目标UUID={location_id[:8]}...")
|
||
logger.warning(f" 🔍 完整location数据: {loc}")
|
||
break
|
||
|
||
# 检查当前物料是否在其他位置
|
||
if material.get("id") == material_id and locations:
|
||
current_material_location = locations[0]
|
||
current_location_uuid = current_material_location.get("id")
|
||
logger.info(f"📍 [同步→Bioyond] 物料当前位置: {current_material_location.get('whName')}/{current_material_location.get('code')} (UUID: {current_location_uuid[:8]}...)")
|
||
|
||
if location_occupied:
|
||
break
|
||
|
||
if location_occupied:
|
||
# 如果是同一个物料(ID相同),说明已经在目标位置了,跳过
|
||
if occupying_material and occupying_material.get("id") == material_id:
|
||
logger.info(f"✅ [同步→Bioyond] 物料 {resource.name} 已经在库位 {update_site},跳过重复入库")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ [同步→Bioyond] 库位 {update_site} 已被其他物料占用,拒绝入库")
|
||
return False
|
||
|
||
logger.info(f"✅ [同步→Bioyond] 库位 {update_site} 可用,准备入库...")
|
||
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ [同步→Bioyond] 检查库位状态时发生异常: {e},继续尝试入库...")
|
||
|
||
# 🔧 如果物料当前在其他位置,先出库再入库
|
||
if current_location_uuid and current_location_uuid != location_id:
|
||
logger.info(f"[同步→Bioyond] 🚚 物料需要移动,先从当前位置出库...")
|
||
logger.info(f" 当前位置 UUID: {current_location_uuid[:8]}...")
|
||
logger.info(f" 目标位置 UUID: {location_id[:8]}...")
|
||
|
||
try:
|
||
# 获取物料数量用于出库
|
||
material_quantity = current_material_location.get("totalNumber", 1)
|
||
logger.info(f" 出库数量: {material_quantity}")
|
||
|
||
# 调用出库 API
|
||
outbound_response = self.bioyond_api_client.material_outbound_by_id(
|
||
material_id,
|
||
current_location_uuid,
|
||
material_quantity
|
||
)
|
||
logger.info(f"✅ [同步→Bioyond] 物料从 {current_material_location.get('code')} 出库成功")
|
||
except Exception as e:
|
||
logger.error(f"❌ [同步→Bioyond] 物料出库失败: {e}")
|
||
return False
|
||
|
||
# 执行入库
|
||
logger.info(f"[同步→Bioyond] 📥 调用 Bioyond API 物料入库...")
|
||
response = self.bioyond_api_client.material_inbound(material_id, location_id)
|
||
|
||
# 注意:Bioyond API 成功时返回空字典 {},所以不能用 if not response 判断
|
||
# 只要没有抛出异常,就认为成功(response 是 dict 类型,即使是 {} 也不是 None)
|
||
if response is not None:
|
||
logger.info(f"✅ [同步→Bioyond] 物料 {resource.name} 成功入库到 {update_site}")
|
||
|
||
# 入库成功后,重新查询验证物料实际入库位置
|
||
logger.info(f"[同步→Bioyond] 🔍 验证物料实际入库位置...")
|
||
try:
|
||
all_materials_type1 = self.bioyond_api_client.stock_material('{"typeMode": 1, "includeDetail": true}')
|
||
all_materials_type2 = self.bioyond_api_client.stock_material('{"typeMode": 2, "includeDetail": true}')
|
||
all_materials = (all_materials_type1 or []) + (all_materials_type2 or [])
|
||
|
||
for material in all_materials:
|
||
if material.get("id") == material_id:
|
||
locations = material.get("locations", [])
|
||
if locations:
|
||
actual_loc = locations[0]
|
||
logger.info(f"📍 [同步→Bioyond] 物料实际位置: code={actual_loc.get('code')}, "
|
||
f"warehouse={actual_loc.get('whName')}, "
|
||
f"x={actual_loc.get('x')}, y={actual_loc.get('y')}")
|
||
|
||
# 验证 UUID 是否匹配
|
||
if actual_loc.get("id") != location_id:
|
||
logger.error(f"❌ [同步→Bioyond] UUID 不匹配!")
|
||
logger.error(f" 预期 UUID: {location_id}")
|
||
logger.error(f" 实际 UUID: {actual_loc.get('id')}")
|
||
logger.error(f" 这说明配置文件中的 UUID 映射有误,请检查 config.py 中的 WAREHOUSE_MAPPING")
|
||
break
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ [同步→Bioyond] 验证入库位置时发生异常: {e}")
|
||
else:
|
||
logger.error(f"❌ [同步→Bioyond] 物料入库失败")
|
||
return False
|
||
else:
|
||
logger.warning(f"⚠️ [同步→Bioyond] 无法获取库位 ID,跳过入库操作")
|
||
else:
|
||
logger.warning(f"⚠️ [同步→Bioyond] 物料没有库位信息,跳过入库操作")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [同步→Bioyond] 同步物料 {resource.name} 时发生异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def handle_external_change(self, change_info: Dict[str, Any]) -> bool:
|
||
"""处理Bioyond系统的变更通知"""
|
||
try:
|
||
# 这里可以实现对Bioyond变更的处理逻辑
|
||
logger.info(f"处理Bioyond变更通知: {change_info}")
|
||
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"处理Bioyond变更通知失败: {e}")
|
||
return False
|
||
|
||
def _create_material_only(self, resource: Any) -> Optional[str]:
|
||
"""只创建物料到 Bioyond 系统(不入库)
|
||
|
||
Transfer 阶段使用:只调用 add_material API 创建物料记录
|
||
|
||
Args:
|
||
resource: 要创建的资源对象
|
||
|
||
Returns:
|
||
str: 创建成功返回 Bioyond 物料 ID,失败返回 None
|
||
"""
|
||
try:
|
||
# 跳过仓库类型的资源
|
||
resource_category = getattr(resource, "category", None)
|
||
if resource_category == "warehouse":
|
||
logger.debug(f"[创建物料] 跳过仓库类型资源: {resource.name}")
|
||
return None
|
||
|
||
logger.info(f"[创建物料] 开始创建物料: {resource.name}")
|
||
|
||
# 检查是否已经有 Bioyond ID
|
||
extra_info = getattr(resource, "unilabos_extra", {})
|
||
material_bioyond_id = extra_info.get("material_bioyond_id")
|
||
|
||
if material_bioyond_id:
|
||
logger.info(f"[创建物料] 物料 {resource.name} 已存在 (ID: {material_bioyond_id[:8]}...),跳过创建")
|
||
return material_bioyond_id
|
||
|
||
# 转换为 Bioyond 格式
|
||
from .config import MATERIAL_DEFAULT_PARAMETERS
|
||
|
||
bioyond_material = resource_plr_to_bioyond(
|
||
[resource],
|
||
type_mapping=self.workstation.bioyond_config["material_type_mappings"],
|
||
warehouse_mapping=self.workstation.bioyond_config["warehouse_mapping"],
|
||
material_params=MATERIAL_DEFAULT_PARAMETERS
|
||
)[0]
|
||
|
||
# ⚠️ 关键:创建物料时不设置 locations,让 Bioyond 系统暂不分配库位
|
||
# locations 字段在后续的入库操作中才会指定
|
||
bioyond_material.pop("locations", None)
|
||
|
||
logger.info(f"[创建物料] 调用 Bioyond API 创建物料(不指定库位)...")
|
||
material_id = self.bioyond_api_client.add_material(bioyond_material)
|
||
|
||
if not material_id:
|
||
logger.error(f"[创建物料] 创建物料失败,API 返回空")
|
||
return None
|
||
|
||
logger.info(f"✅ [创建物料] 物料创建成功,ID: {material_id[:8]}...")
|
||
|
||
# 保存 Bioyond ID 到资源对象
|
||
extra_info["material_bioyond_id"] = material_id
|
||
setattr(resource, "unilabos_extra", extra_info)
|
||
|
||
return material_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [创建物料] 创建物料 {resource.name} 时发生异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
def _inbound_material_only(self, resource: Any, material_id: str) -> bool:
|
||
"""只执行物料入库操作(物料已存在于 Bioyond 系统)
|
||
|
||
Add 阶段使用:调用 material_inbound API 将物料入库到指定库位
|
||
|
||
Args:
|
||
resource: 要入库的资源对象
|
||
material_id: Bioyond 物料 ID
|
||
|
||
Returns:
|
||
bool: 入库成功返回 True,失败返回 False
|
||
"""
|
||
try:
|
||
logger.info(f"[物料入库] 开始入库物料: {resource.name} (ID: {material_id[:8]}...)")
|
||
|
||
# 获取目标库位信息
|
||
extra_info = getattr(resource, "unilabos_extra", {})
|
||
update_site = extra_info.get("update_resource_site")
|
||
|
||
if not update_site:
|
||
logger.warning(f"[物料入库] 物料 {resource.name} 没有指定目标库位,跳过入库")
|
||
return True
|
||
|
||
logger.info(f"[物料入库] 目标库位: {update_site}")
|
||
|
||
# 获取仓库配置和目标库位 UUID
|
||
from .config import WAREHOUSE_MAPPING
|
||
warehouse_mapping = WAREHOUSE_MAPPING
|
||
|
||
parent_name = None
|
||
target_location_uuid = None
|
||
|
||
# 查找目标库位的 UUID
|
||
if resource.parent is not None:
|
||
parent_name_candidate = resource.parent.name
|
||
if parent_name_candidate in warehouse_mapping:
|
||
site_uuids = warehouse_mapping[parent_name_candidate].get("site_uuids", {})
|
||
if update_site in site_uuids:
|
||
parent_name = parent_name_candidate
|
||
target_location_uuid = site_uuids[update_site]
|
||
logger.info(f"[物料入库] 从父节点找到库位: {parent_name}/{update_site}")
|
||
|
||
# 兜底:遍历所有仓库查找
|
||
if not target_location_uuid:
|
||
for warehouse_name, warehouse_info in warehouse_mapping.items():
|
||
site_uuids = warehouse_info.get("site_uuids", {})
|
||
if update_site in site_uuids:
|
||
parent_name = warehouse_name
|
||
target_location_uuid = site_uuids[update_site]
|
||
logger.info(f"[物料入库] 从所有仓库找到库位: {parent_name}/{update_site}")
|
||
break
|
||
|
||
if not target_location_uuid:
|
||
logger.error(f"❌ [物料入库] 库位 {update_site} 未在配置中找到")
|
||
return False
|
||
|
||
logger.info(f"[物料入库] 库位 UUID: {target_location_uuid[:8]}...")
|
||
|
||
# 调用入库 API
|
||
logger.info(f"[物料入库] 调用 Bioyond API 执行入库...")
|
||
response = self.bioyond_api_client.material_inbound(material_id, target_location_uuid)
|
||
|
||
if response: # 空字典 {} 表示失败,非空字典表示成功
|
||
logger.info(f"✅ [物料入库] 物料 {resource.name} 成功入库到 {update_site}")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ [物料入库] 物料入库失败,API返回空响应或失败")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [物料入库] 入库物料 {resource.name} 时发生异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
|
||
class BioyondWorkstation(WorkstationBase):
|
||
"""Bioyond工作站
|
||
|
||
集成Bioyond物料管理的工作站实现
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
bioyond_config: Optional[Dict[str, Any]] = None,
|
||
deck: Optional[Any] = None,
|
||
*args,
|
||
**kwargs,
|
||
):
|
||
# 初始化父类
|
||
super().__init__(
|
||
# 桌子
|
||
deck=deck,
|
||
*args,
|
||
**kwargs,
|
||
)
|
||
|
||
# 检查 deck 是否为 None,防止 AttributeError
|
||
if self.deck is None:
|
||
logger.error("❌ Deck 配置为空,请检查配置文件中的 deck 参数")
|
||
raise ValueError("Deck 配置不能为空,请在配置文件中添加正确的 deck 配置")
|
||
|
||
# 初始化 warehouses 属性
|
||
self.deck.warehouses = {}
|
||
for resource in self.deck.children:
|
||
if isinstance(resource, WareHouse):
|
||
self.deck.warehouses[resource.name] = resource
|
||
|
||
# 创建通信模块
|
||
self._create_communication_module(bioyond_config)
|
||
self.resource_synchronizer = BioyondResourceSynchronizer(self)
|
||
self.resource_synchronizer.sync_from_external()
|
||
|
||
# TODO: self._ros_node里面拿属性
|
||
|
||
# 工作流加载
|
||
self.is_running = False
|
||
self.workflow_mappings = {}
|
||
self.workflow_sequence = []
|
||
self.pending_task_params = []
|
||
|
||
if "workflow_mappings" in bioyond_config:
|
||
self._set_workflow_mappings(bioyond_config["workflow_mappings"])
|
||
|
||
# 准备 HTTP 报送接收服务配置(延迟到 post_init 启动)
|
||
# 从 bioyond_config 中获取,如果没有则使用 HTTP_SERVICE_CONFIG 的默认值
|
||
self._http_service_config = {
|
||
"host": bioyond_config.get("http_service_host", HTTP_SERVICE_CONFIG["http_service_host"]),
|
||
"port": bioyond_config.get("http_service_port", HTTP_SERVICE_CONFIG["http_service_port"])
|
||
}
|
||
self.http_service = None # 将在 post_init 中启动
|
||
|
||
logger.info(f"Bioyond工作站初始化完成")
|
||
|
||
def __del__(self):
|
||
"""析构函数:清理资源,停止 HTTP 服务"""
|
||
try:
|
||
if hasattr(self, 'http_service') and self.http_service is not None:
|
||
logger.info("正在停止 HTTP 报送服务...")
|
||
self.http_service.stop()
|
||
except Exception as e:
|
||
logger.error(f"停止 HTTP 服务时发生错误: {e}")
|
||
|
||
def post_init(self, ros_node: ROS2WorkstationNode):
|
||
self._ros_node = ros_node
|
||
|
||
# 启动 HTTP 报送接收服务(现在 device_id 已可用)
|
||
if hasattr(self, '_http_service_config'):
|
||
try:
|
||
self.http_service = WorkstationHTTPService(
|
||
workstation_instance=self,
|
||
host=self._http_service_config["host"],
|
||
port=self._http_service_config["port"]
|
||
)
|
||
self.http_service.start()
|
||
logger.info(f"Bioyond工作站HTTP报送服务已启动: {self.http_service.service_url}")
|
||
except Exception as e:
|
||
logger.error(f"启动HTTP报送服务失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
self.http_service = None
|
||
|
||
# ⭐ 上传 deck(包括所有 warehouses 及其中的物料)
|
||
# 注意:如果有从 Bioyond 同步的物料,它们已经被放置到 warehouse 中了
|
||
# 所以只需要上传 deck,物料会作为 warehouse 的 children 一起上传
|
||
logger.info("正在上传 deck(包括 warehouses 和物料)到云端...")
|
||
ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{
|
||
"resources": [self.deck]
|
||
})
|
||
|
||
# 清理临时变量(物料已经在 deck 的 warehouse children 中,不需要单独上传)
|
||
if hasattr(self, "_synced_resources"):
|
||
logger.info(f"✅ {len(self._synced_resources)} 个从Bioyond同步的物料已包含在 deck 中")
|
||
self._synced_resources = []
|
||
|
||
def transfer_resource_to_another(self, resource: List[ResourceSlot], mount_resource: List[ResourceSlot], sites: List[str], mount_device_id: DeviceSlot):
|
||
future = ROS2DeviceNode.run_async_func(self._ros_node.transfer_resource_to_another, True, **{
|
||
"plr_resources": resource,
|
||
"target_device_id": mount_device_id,
|
||
"target_resources": mount_resource,
|
||
"sites": sites,
|
||
})
|
||
return future
|
||
|
||
def _create_communication_module(self, config: Optional[Dict[str, Any]] = None) -> None:
|
||
"""创建Bioyond通信模块"""
|
||
# 创建默认配置
|
||
default_config = {
|
||
**API_CONFIG,
|
||
"workflow_mappings": WORKFLOW_MAPPINGS,
|
||
"material_type_mappings": MATERIAL_TYPE_MAPPINGS,
|
||
"warehouse_mapping": WAREHOUSE_MAPPING
|
||
}
|
||
|
||
# 如果传入了 config,合并配置(config 中的值会覆盖默认值)
|
||
if config:
|
||
self.bioyond_config = {**default_config, **config}
|
||
else:
|
||
self.bioyond_config = default_config
|
||
|
||
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
|
||
|
||
def resource_tree_add(self, resources: List[ResourcePLR]) -> None:
|
||
"""添加资源到资源树并更新ROS节点
|
||
|
||
Args:
|
||
resources (List[ResourcePLR]): 要添加的资源列表
|
||
"""
|
||
logger.info(f"[resource_tree_add] 开始同步 {len(resources)} 个资源到 Bioyond 系统")
|
||
for resource in resources:
|
||
try:
|
||
# 🔍 检查资源是否已有 Bioyond ID
|
||
extra_info = getattr(resource, "unilabos_extra", {})
|
||
material_bioyond_id = extra_info.get("material_bioyond_id")
|
||
|
||
if material_bioyond_id:
|
||
# ⭐ 已有 Bioyond ID,说明 transfer 已经创建了物料
|
||
# 现在只需要执行入库操作
|
||
logger.info(f"✅ [resource_tree_add] 物料 {resource.name} 已有 Bioyond ID ({material_bioyond_id[:8]}...),执行入库操作")
|
||
self.resource_synchronizer._inbound_material_only(resource, material_bioyond_id)
|
||
else:
|
||
# ⚠️ 没有 Bioyond ID,说明是直接添加的物料(兜底逻辑)
|
||
# 需要先创建再入库
|
||
logger.info(f"⚠️ [resource_tree_add] 物料 {resource.name} 无 Bioyond ID,执行创建+入库操作")
|
||
self.resource_synchronizer.sync_to_external(resource)
|
||
|
||
except Exception as e:
|
||
logger.error(f"[resource_tree_add] 同步资源失败 {resource}: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def resource_tree_remove(self, resources: List[ResourcePLR]) -> None:
|
||
"""处理资源删除时的同步(出库操作)
|
||
|
||
当 UniLab 前端删除物料时,需要将删除操作同步到 Bioyond 系统(出库)
|
||
|
||
Args:
|
||
resources: 要删除的资源列表
|
||
"""
|
||
logger.info(f"[resource_tree_remove] 收到 {len(resources)} 个资源的移除请求(出库操作)")
|
||
|
||
# ⭐ 关键优化:先找出所有的顶层容器(BottleCarrier),只对它们进行出库
|
||
# 因为在 Bioyond 中,容器(如分装板 1105-12)是一个完整的物料
|
||
# 里面的小瓶子是它的 detail 字段,不需要单独出库
|
||
|
||
top_level_resources = []
|
||
child_resource_names = set()
|
||
|
||
# 第一步:识别所有子资源的名称
|
||
for resource in resources:
|
||
resource_category = getattr(resource, "category", None)
|
||
if resource_category == "bottle_carrier":
|
||
children = list(resource.children) if hasattr(resource, 'children') else []
|
||
for child in children:
|
||
child_resource_names.add(child.name)
|
||
|
||
# 第二步:筛选出顶层资源(不是任何容器的子资源)
|
||
for resource in resources:
|
||
resource_category = getattr(resource, "category", None)
|
||
|
||
# 跳过仓库类型的资源
|
||
if resource_category == "warehouse":
|
||
logger.debug(f"[resource_tree_remove] 跳过仓库类型资源: {resource.name}")
|
||
continue
|
||
|
||
# 如果是容器,它就是顶层资源
|
||
if resource_category == "bottle_carrier":
|
||
top_level_resources.append(resource)
|
||
logger.info(f"[resource_tree_remove] 识别到顶层容器资源: {resource.name}")
|
||
# 如果不是任何容器的子资源,它也是顶层资源
|
||
elif resource.name not in child_resource_names:
|
||
top_level_resources.append(resource)
|
||
logger.info(f"[resource_tree_remove] 识别到顶层独立资源: {resource.name}")
|
||
else:
|
||
logger.debug(f"[resource_tree_remove] 跳过子资源(将随容器一起出库): {resource.name}")
|
||
|
||
logger.info(f"[resource_tree_remove] 实际需要处理的顶层资源: {len(top_level_resources)} 个")
|
||
|
||
# 第三步:对每个顶层资源执行出库操作
|
||
for resource in top_level_resources:
|
||
try:
|
||
self._outbound_single_resource(resource)
|
||
except Exception as e:
|
||
logger.error(f"❌ [resource_tree_remove] 处理资源 {resource.name} 出库失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
logger.info(f"[resource_tree_remove] 资源移除(出库)操作完成")
|
||
|
||
def _outbound_single_resource(self, resource: ResourcePLR) -> bool:
|
||
"""对单个资源执行 Bioyond 出库操作
|
||
|
||
Args:
|
||
resource: 要出库的资源
|
||
|
||
Returns:
|
||
bool: 出库是否成功
|
||
"""
|
||
try:
|
||
logger.info(f"[resource_tree_remove] 🎯 开始处理资源出库: {resource.name}")
|
||
|
||
# 获取资源的 Bioyond 信息
|
||
extra_info = getattr(resource, "unilabos_extra", {})
|
||
material_bioyond_id = extra_info.get("material_bioyond_id")
|
||
material_bioyond_name = extra_info.get("material_bioyond_name") # ⭐ 原始 Bioyond 名称
|
||
|
||
# ⭐ 优先使用保存的 Bioyond ID,避免重复查询
|
||
if material_bioyond_id:
|
||
logger.info(f"✅ [resource_tree_remove] 从资源中获取到 Bioyond ID: {material_bioyond_id[:8]}...")
|
||
if material_bioyond_name and material_bioyond_name != resource.name:
|
||
logger.info(f" 原始 Bioyond 名称: {material_bioyond_name} (当前名称: {resource.name})")
|
||
else:
|
||
# 如果没有 Bioyond ID,尝试按名称查询
|
||
logger.info(f"[resource_tree_remove] 资源 {resource.name} 没有保存 Bioyond ID,尝试查询...")
|
||
|
||
# ⭐ 优先使用保存的原始 Bioyond 名称,如果没有则使用当前名称
|
||
query_name = material_bioyond_name if material_bioyond_name else resource.name
|
||
logger.info(f"[resource_tree_remove] 查询 Bioyond 系统中的物料: {query_name}")
|
||
|
||
# 查询所有类型的物料:0=耗材, 1=样品, 2=试剂
|
||
all_materials = []
|
||
for type_mode in [0, 1, 2]:
|
||
query_params = json.dumps({
|
||
"typeMode": type_mode,
|
||
"filter": query_name, # ⭐ 使用原始 Bioyond 名称查询
|
||
"includeDetail": True
|
||
})
|
||
materials = self.hardware_interface.stock_material(query_params)
|
||
if materials:
|
||
all_materials.extend(materials)
|
||
|
||
# 精确匹配物料名称
|
||
matched_material = None
|
||
for mat in all_materials:
|
||
if mat.get("name") == query_name:
|
||
matched_material = mat
|
||
material_bioyond_id = mat.get("id")
|
||
logger.info(f"✅ [resource_tree_remove] 找到物料 {query_name} 的 Bioyond ID: {material_bioyond_id[:8]}...")
|
||
break
|
||
|
||
if not matched_material:
|
||
logger.warning(f"⚠️ [resource_tree_remove] Bioyond 系统中未找到物料: {query_name}")
|
||
logger.info(f"[resource_tree_remove] 该物料可能尚未入库或已被删除,跳过出库操作")
|
||
return True
|
||
|
||
# 获取物料当前所在的库位信息
|
||
logger.info(f"[resource_tree_remove] 📍 查询物料的库位信息...")
|
||
|
||
# 重新查询物料详情以获取最新的库位信息
|
||
all_materials_type1 = self.hardware_interface.stock_material('{"typeMode": 1, "includeDetail": true}')
|
||
all_materials_type2 = self.hardware_interface.stock_material('{"typeMode": 2, "includeDetail": true}')
|
||
all_materials_type0 = self.hardware_interface.stock_material('{"typeMode": 0, "includeDetail": true}')
|
||
all_materials = (all_materials_type0 or []) + (all_materials_type1 or []) + (all_materials_type2 or [])
|
||
|
||
location_id = None
|
||
current_quantity = 0
|
||
|
||
for material in all_materials:
|
||
if material.get("id") == material_bioyond_id:
|
||
locations = material.get("locations", [])
|
||
if locations:
|
||
# 取第一个库位
|
||
location = locations[0]
|
||
location_id = location.get("id")
|
||
current_quantity = location.get("quantity", 1)
|
||
logger.info(f"📍 [resource_tree_remove] 物料位于库位:")
|
||
logger.info(f" - 库位代码: {location.get('code')}")
|
||
logger.info(f" - 仓库名称: {location.get('whName')}")
|
||
logger.info(f" - 数量: {current_quantity}")
|
||
logger.info(f" - 库位ID: {location_id[:8]}...")
|
||
break
|
||
else:
|
||
logger.warning(f"⚠️ [resource_tree_remove] 物料没有库位信息,可能尚未入库")
|
||
return True
|
||
|
||
if not location_id:
|
||
logger.warning(f"⚠️ [resource_tree_remove] 无法获取物料的库位信息,跳过出库")
|
||
return False
|
||
|
||
# 调用 Bioyond 出库 API
|
||
logger.info(f"[resource_tree_remove] 📤 调用 Bioyond API 出库物料...")
|
||
logger.info(f" UniLab 名称: {resource.name}")
|
||
if material_bioyond_name and material_bioyond_name != resource.name:
|
||
logger.info(f" Bioyond 名称: {material_bioyond_name}")
|
||
logger.info(f" 物料ID: {material_bioyond_id[:8]}...")
|
||
logger.info(f" 库位ID: {location_id[:8]}...")
|
||
logger.info(f" 出库数量: {current_quantity}")
|
||
|
||
response = self.hardware_interface.material_outbound_by_id(
|
||
material_id=material_bioyond_id,
|
||
location_id=location_id,
|
||
quantity=current_quantity
|
||
)
|
||
|
||
if response is not None:
|
||
logger.info(f"✅ [resource_tree_remove] 物料成功从 Bioyond 系统出库")
|
||
return True
|
||
else:
|
||
logger.error(f"❌ [resource_tree_remove] 物料出库失败,API 返回空")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [resource_tree_remove] 物料 {resource.name} 出库时发生异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return False
|
||
|
||
def resource_tree_transfer(self, old_parent: Optional[ResourcePLR], resource: ResourcePLR, new_parent: ResourcePLR) -> None:
|
||
"""处理资源在设备间迁移时的同步
|
||
|
||
当资源从一个设备迁移到 BioyondWorkstation 时,只创建物料(不入库)
|
||
入库操作由后续的 resource_tree_add 完成
|
||
|
||
Args:
|
||
old_parent: 资源的原父节点(可能为 None)
|
||
resource: 要迁移的资源
|
||
new_parent: 资源的新父节点
|
||
"""
|
||
logger.info(f"[resource_tree_transfer] 资源迁移: {resource.name}")
|
||
logger.info(f" 旧父节点: {old_parent.name if old_parent else 'None'}")
|
||
logger.info(f" 新父节点: {new_parent.name}")
|
||
|
||
try:
|
||
# ⭐ Transfer 阶段:只创建物料到 Bioyond 系统,不执行入库
|
||
logger.info(f"[resource_tree_transfer] 开始创建物料 {resource.name} 到 Bioyond 系统(不入库)")
|
||
result = self.resource_synchronizer._create_material_only(resource)
|
||
|
||
if result:
|
||
logger.info(f"✅ [resource_tree_transfer] 物料 {resource.name} 创建成功,Bioyond ID: {result[:8]}...")
|
||
else:
|
||
logger.warning(f"⚠️ [resource_tree_transfer] 物料 {resource.name} 创建失败")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [resource_tree_transfer] 资源 {resource.name} 创建异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
def resource_tree_update(self, resources: List[ResourcePLR]) -> None:
|
||
"""处理资源更新时的同步(位置移动、属性修改等)
|
||
|
||
当 UniLab 前端更新物料信息时(如修改位置),需要将更新操作同步到 Bioyond 系统
|
||
|
||
Args:
|
||
resources: 要更新的资源列表
|
||
"""
|
||
logger.info(f"[resource_tree_update] 开始同步 {len(resources)} 个资源更新到 Bioyond 系统")
|
||
|
||
for resource in resources:
|
||
try:
|
||
logger.info(f"[resource_tree_update] 同步资源更新: {resource.name}")
|
||
|
||
# 调用同步器的 sync_to_external 方法
|
||
# 该方法会检查 unilabos_extra 中的 update_resource_site 字段
|
||
# 如果存在,会执行位置移动操作
|
||
result = self.resource_synchronizer.sync_to_external(resource)
|
||
|
||
if result:
|
||
logger.info(f"✅ [resource_tree_update] 资源 {resource.name} 成功同步到 Bioyond 系统")
|
||
else:
|
||
logger.warning(f"⚠️ [resource_tree_update] 资源 {resource.name} 同步到 Bioyond 系统失败")
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ [resource_tree_update] 同步资源 {resource.name} 时发生异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
logger.info(f"[resource_tree_update] 资源更新同步完成")
|
||
|
||
@property
|
||
def bioyond_status(self) -> Dict[str, Any]:
|
||
"""获取 Bioyond 系统状态信息
|
||
|
||
这个属性被 ROS 节点用来发布设备状态
|
||
|
||
Returns:
|
||
Dict[str, Any]: Bioyond 系统的状态信息
|
||
- 连接成功时返回 {"connected": True}
|
||
- 连接失败时返回 {"connected": False, "error": "错误信息"}
|
||
"""
|
||
try:
|
||
# 检查硬件接口是否存在
|
||
if not self.hardware_interface:
|
||
return {"connected": False, "error": "hardware_interface not initialized"}
|
||
|
||
# 尝试获取调度器状态来验证连接
|
||
scheduler_status = self.hardware_interface.scheduler_status()
|
||
|
||
# 如果能成功获取状态,说明连接正常
|
||
if scheduler_status:
|
||
return {"connected": True}
|
||
else:
|
||
return {"connected": False, "error": "scheduler_status returned None"}
|
||
|
||
except Exception as e:
|
||
logger.warning(f"获取Bioyond状态失败: {e}")
|
||
return {"connected": False, "error": str(e)}
|
||
|
||
# ==================== 工作流合并与参数设置 API ====================
|
||
|
||
def append_to_workflow_sequence(self, web_workflow_name: str) -> bool:
|
||
# 检查是否为JSON格式的字符串
|
||
actual_workflow_name = web_workflow_name
|
||
if web_workflow_name.startswith('{') and web_workflow_name.endswith('}'):
|
||
try:
|
||
data = json.loads(web_workflow_name)
|
||
actual_workflow_name = data.get("web_workflow_name", web_workflow_name)
|
||
print(f"解析JSON格式工作流名称: {web_workflow_name} -> {actual_workflow_name}")
|
||
except json.JSONDecodeError:
|
||
print(f"JSON解析失败,使用原始字符串: {web_workflow_name}")
|
||
|
||
workflow_id = self._get_workflow(actual_workflow_name)
|
||
if workflow_id:
|
||
self.workflow_sequence.append(workflow_id)
|
||
print(f"添加工作流到执行顺序: {actual_workflow_name} -> {workflow_id}")
|
||
return True
|
||
return False
|
||
|
||
def set_workflow_sequence(self, json_str: str) -> List[str]:
|
||
try:
|
||
data = json.loads(json_str)
|
||
web_workflow_names = data.get("web_workflow_names", [])
|
||
except:
|
||
return []
|
||
|
||
sequence = []
|
||
for web_name in web_workflow_names:
|
||
workflow_id = self._get_workflow(web_name)
|
||
if workflow_id:
|
||
sequence.append(workflow_id)
|
||
|
||
def get_all_workflows(self) -> Dict[str, str]:
|
||
return self.workflow_mappings.copy()
|
||
|
||
def _get_workflow(self, web_workflow_name: str) -> str:
|
||
if web_workflow_name not in self.workflow_mappings:
|
||
print(f"未找到工作流映射配置: {web_workflow_name}")
|
||
return ""
|
||
workflow_id = self.workflow_mappings[web_workflow_name]
|
||
print(f"获取工作流: {web_workflow_name} -> {workflow_id}")
|
||
return workflow_id
|
||
|
||
def _set_workflow_mappings(self, mappings: Dict[str, str]):
|
||
self.workflow_mappings = mappings
|
||
print(f"设置工作流映射配置: {mappings}")
|
||
|
||
def process_web_workflows(self, json_str: str) -> Dict[str, str]:
|
||
try:
|
||
data = json.loads(json_str)
|
||
web_workflow_list = data.get("web_workflow_list", [])
|
||
except json.JSONDecodeError:
|
||
print(f"无效的JSON字符串: {json_str}")
|
||
return {}
|
||
result = {}
|
||
|
||
self.workflow_sequence = []
|
||
for web_name in web_workflow_list:
|
||
workflow_id = self._get_workflow(web_name)
|
||
if workflow_id:
|
||
result[web_name] = workflow_id
|
||
self.workflow_sequence.append(workflow_id)
|
||
else:
|
||
print(f"无法获取工作流ID: {web_name}")
|
||
print(f"工作流执行顺序: {self.workflow_sequence}")
|
||
return result
|
||
|
||
def clear_workflows(self):
|
||
self.workflow_sequence = []
|
||
print("清空工作流执行顺序")
|
||
|
||
# ==================== 基础物料管理接口 ====================
|
||
|
||
# ============ 工作站状态管理 ============
|
||
def get_station_info(self) -> Dict[str, Any]:
|
||
"""获取工作站基础信息
|
||
|
||
Returns:
|
||
Dict[str, Any]: 工作站基础信息,包括设备ID、状态等
|
||
"""
|
||
return {
|
||
"device_id": getattr(self._ros_node, 'device_id', 'unknown'),
|
||
"station_type": "BioyondWorkstation",
|
||
"workflow_status": self.current_workflow_status.value if hasattr(self, 'current_workflow_status') else "unknown",
|
||
"is_busy": getattr(self, 'is_busy', False),
|
||
"deck_info": {
|
||
"name": self.deck.name if self.deck and hasattr(self.deck, 'name') else "unknown",
|
||
"children_count": len(self.deck.children) if self.deck and hasattr(self.deck, 'children') else 0
|
||
} if self.deck else None,
|
||
"hardware_interface": type(self.hardware_interface).__name__ if self.hardware_interface else None
|
||
}
|
||
|
||
def get_workstation_status(self) -> Dict[str, Any]:
|
||
"""获取工作站状态
|
||
|
||
Returns:
|
||
Dict[str, Any]: 工作站状态信息
|
||
"""
|
||
try:
|
||
# 获取基础工作站状态
|
||
base_status = {
|
||
"station_info": self.get_station_info(),
|
||
"bioyond_status": self.bioyond_status
|
||
}
|
||
|
||
# 如果有接口,获取设备列表
|
||
if self.hardware_interface:
|
||
try:
|
||
devices = self.hardware_interface.device_list()
|
||
base_status["devices"] = devices
|
||
except Exception as e:
|
||
logger.warning(f"获取设备列表失败: {e}")
|
||
base_status["devices"] = []
|
||
|
||
return {
|
||
"success": True,
|
||
"data": base_status,
|
||
"action": "get_workstation_status"
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取工作站状态失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
return {
|
||
"success": False,
|
||
"message": error_msg,
|
||
"action": "get_workstation_status"
|
||
}
|
||
|
||
def get_bioyond_status(self) -> Dict[str, Any]:
|
||
"""获取完整的 Bioyond 状态信息
|
||
|
||
这个方法提供了比 bioyond_status 属性更详细的状态信息,
|
||
包括错误处理和格式化的响应结构
|
||
|
||
Returns:
|
||
Dict[str, Any]: 格式化的 Bioyond 状态响应
|
||
"""
|
||
try:
|
||
status = self.bioyond_status
|
||
return {
|
||
"success": True,
|
||
"data": status,
|
||
"action": "get_bioyond_status"
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"获取 Bioyond 状态失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
return {
|
||
"success": False,
|
||
"message": error_msg,
|
||
"action": "get_bioyond_status"
|
||
}
|
||
|
||
def reset_workstation(self) -> Dict[str, Any]:
|
||
"""重置工作站
|
||
|
||
重置工作站到初始状态
|
||
|
||
Returns:
|
||
Dict[str, Any]: 操作结果
|
||
"""
|
||
try:
|
||
logger.info("开始重置工作站")
|
||
|
||
# 重置调度器
|
||
if self.hardware_interface:
|
||
self.hardware_interface.scheduler_reset()
|
||
|
||
# 刷新物料缓存
|
||
if self.hardware_interface:
|
||
self.hardware_interface.refresh_material_cache()
|
||
|
||
# 重新同步资源
|
||
if self.resource_synchronizer:
|
||
self.resource_synchronizer.sync_from_external()
|
||
|
||
logger.info("工作站重置完成")
|
||
return {
|
||
"success": True,
|
||
"message": "工作站重置成功",
|
||
"action": "reset_workstation"
|
||
}
|
||
|
||
except Exception as e:
|
||
error_msg = f"重置工作站失败: {str(e)}"
|
||
logger.error(error_msg)
|
||
return {
|
||
"success": False,
|
||
"message": error_msg,
|
||
"action": "reset_workstation"
|
||
}
|
||
|
||
# ==================== HTTP 报送处理方法 ====================
|
||
|
||
def process_step_finish_report(self, report_request) -> Dict[str, Any]:
|
||
"""处理步骤完成报送
|
||
|
||
Args:
|
||
report_request: WorkstationReportRequest 对象,包含步骤完成信息
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果
|
||
"""
|
||
try:
|
||
data = report_request.data
|
||
logger.info(f"[步骤完成报送] 订单: {data.get('orderCode')}, 步骤: {data.get('stepName')}")
|
||
logger.info(f" 样品ID: {data.get('sampleId')}")
|
||
logger.info(f" 开始时间: {data.get('startTime')}")
|
||
logger.info(f" 结束时间: {data.get('endTime')}")
|
||
|
||
# TODO: 根据实际业务需求处理步骤完成逻辑
|
||
# 例如:更新数据库、触发后续流程等
|
||
|
||
return {
|
||
"processed": True,
|
||
"step_id": data.get('stepId'),
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理步骤完成报送失败: {e}")
|
||
return {"processed": False, "error": str(e)}
|
||
|
||
def process_sample_finish_report(self, report_request) -> Dict[str, Any]:
|
||
"""处理通量完成报送
|
||
|
||
Args:
|
||
report_request: WorkstationReportRequest 对象,包含通量完成信息
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果
|
||
"""
|
||
try:
|
||
data = report_request.data
|
||
status_names = {
|
||
"0": "待生产", "2": "进样", "10": "开始",
|
||
"20": "完成", "-2": "异常停止", "-3": "人工停止"
|
||
}
|
||
status_desc = status_names.get(str(data.get('status')), f"状态{data.get('status')}")
|
||
|
||
logger.info(f"[通量完成报送] 订单: {data.get('orderCode')}, 样品: {data.get('sampleId')}")
|
||
logger.info(f" 状态: {status_desc}")
|
||
logger.info(f" 开始时间: {data.get('startTime')}")
|
||
logger.info(f" 结束时间: {data.get('endTime')}")
|
||
|
||
# TODO: 根据实际业务需求处理通量完成逻辑
|
||
|
||
return {
|
||
"processed": True,
|
||
"sample_id": data.get('sampleId'),
|
||
"status": data.get('status'),
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理通量完成报送失败: {e}")
|
||
return {"processed": False, "error": str(e)}
|
||
|
||
def process_order_finish_report(self, report_request, used_materials: List) -> Dict[str, Any]:
|
||
"""处理任务完成报送
|
||
|
||
Args:
|
||
report_request: WorkstationReportRequest 对象,包含任务完成信息
|
||
used_materials: 物料使用记录列表
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果
|
||
"""
|
||
try:
|
||
data = report_request.data
|
||
status_names = {"30": "完成", "-11": "异常停止", "-12": "人工停止"}
|
||
status_desc = status_names.get(str(data.get('status')), f"状态{data.get('status')}")
|
||
|
||
logger.info(f"[任务完成报送] 订单: {data.get('orderCode')} - {data.get('orderName')}")
|
||
logger.info(f" 状态: {status_desc}")
|
||
logger.info(f" 开始时间: {data.get('startTime')}")
|
||
logger.info(f" 结束时间: {data.get('endTime')}")
|
||
logger.info(f" 使用物料数量: {len(used_materials)}")
|
||
|
||
# 记录物料使用情况
|
||
for material in used_materials:
|
||
logger.debug(f" 物料: {material.materialId}, 用量: {material.usedQuantity}")
|
||
|
||
# TODO: 根据实际业务需求处理任务完成逻辑
|
||
# 例如:更新物料库存、生成报表等
|
||
|
||
return {
|
||
"processed": True,
|
||
"order_code": data.get('orderCode'),
|
||
"status": data.get('status'),
|
||
"materials_count": len(used_materials),
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理任务完成报送失败: {e}")
|
||
return {"processed": False, "error": str(e)}
|
||
|
||
def process_material_change_report(self, report_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""处理物料变更报送
|
||
|
||
Args:
|
||
report_data: 物料变更数据
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果
|
||
"""
|
||
try:
|
||
logger.info(f"[物料变更报送] 工作站: {report_data.get('workstation_id')}")
|
||
logger.info(f" 资源ID: {report_data.get('resource_id')}")
|
||
logger.info(f" 变更类型: {report_data.get('change_type')}")
|
||
logger.info(f" 时间戳: {report_data.get('timestamp')}")
|
||
|
||
# TODO: 根据实际业务需求处理物料变更逻辑
|
||
# 例如:同步到资源树、更新Bioyond系统等
|
||
|
||
return {
|
||
"processed": True,
|
||
"resource_id": report_data.get('resource_id'),
|
||
"change_type": report_data.get('change_type'),
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理物料变更报送失败: {e}")
|
||
return {"processed": False, "error": str(e)}
|
||
|
||
|
||
def handle_external_error(self, error_data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""处理错误处理报送
|
||
|
||
Args:
|
||
error_data: 错误数据(可能是奔曜格式或标准格式)
|
||
|
||
Returns:
|
||
Dict[str, Any]: 处理结果
|
||
"""
|
||
try:
|
||
# 检查是否为奔曜格式
|
||
if 'task' in error_data and 'code' in error_data:
|
||
# 奔曜格式
|
||
logger.error(f"[错误处理报送-奔曜] 任务: {error_data.get('task')}")
|
||
logger.error(f" 错误代码: {error_data.get('code')}")
|
||
logger.error(f" 错误信息: {error_data.get('message', '无')}")
|
||
error_type = "bioyond_error"
|
||
else:
|
||
# 标准格式
|
||
logger.error(f"[错误处理报送] 工作站: {error_data.get('workstation_id')}")
|
||
logger.error(f" 错误类型: {error_data.get('error_type')}")
|
||
logger.error(f" 错误信息: {error_data.get('error_message')}")
|
||
error_type = error_data.get('error_type', 'unknown')
|
||
|
||
# TODO: 根据实际业务需求处理错误
|
||
# 例如:记录日志、发送告警、触发恢复流程等
|
||
|
||
return {
|
||
"handled": True,
|
||
"error_type": error_type,
|
||
"timestamp": datetime.now().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理错误报送失败: {e}")
|
||
return {"handled": False, "error": str(e)}
|
||
|
||
# ==================== 文件加载与其他功能 ====================
|
||
|
||
def load_bioyond_data_from_file(self, file_path: str) -> bool:
|
||
"""从文件加载Bioyond数据(用于测试)"""
|
||
try:
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
bioyond_data = json.load(f)
|
||
|
||
logger.info(f"从文件加载Bioyond数据: {file_path}")
|
||
|
||
# 转换为UniLab格式
|
||
unilab_resources = resource_bioyond_to_plr(
|
||
bioyond_data,
|
||
type_mapping=self.bioyond_config["material_type_mappings"],
|
||
deck=self.deck
|
||
)
|
||
|
||
logger.info(f"成功加载 {len(unilab_resources)} 个资源")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"从文件加载Bioyond数据失败: {e}")
|
||
return False
|
||
|
||
|
||
# 使用示例
|
||
def create_bioyond_workstation_example():
|
||
"""创建Bioyond工作站示例"""
|
||
|
||
# 配置参数
|
||
device_id = "bioyond_workstation_001"
|
||
|
||
# 子资源配置
|
||
children = {
|
||
"plate_1": {
|
||
"name": "plate_1",
|
||
"type": "plate",
|
||
"position": {"x": 100, "y": 100, "z": 0},
|
||
"config": {
|
||
"size_x": 127.76,
|
||
"size_y": 85.48,
|
||
"size_z": 14.35,
|
||
"model": "Generic 96 Well Plate"
|
||
}
|
||
}
|
||
}
|
||
|
||
# Bioyond配置
|
||
bioyond_config = {
|
||
"base_url": "http://bioyond.example.com/api",
|
||
"api_key": "your_api_key_here",
|
||
"sync_interval": 60, # 60秒同步一次
|
||
"timeout": 30
|
||
}
|
||
|
||
# Deck配置
|
||
deck_config = {
|
||
"size_x": 1000.0,
|
||
"size_y": 1000.0,
|
||
"size_z": 100.0,
|
||
"model": "BioyondDeck"
|
||
}
|
||
|
||
# 创建工作站
|
||
workstation = BioyondWorkstation(
|
||
station_resource=deck_config,
|
||
bioyond_config=bioyond_config,
|
||
deck_config=deck_config,
|
||
)
|
||
|
||
return workstation
|
||
|
||
|
||
if __name__ == "__main__":
|
||
pass
|