mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-18 13:31:20 +00:00
375 lines
14 KiB
Python
375 lines
14 KiB
Python
"""
|
||
Bioyond物料管理实现
|
||
Bioyond Material Management Implementation
|
||
|
||
基于Bioyond系统的物料管理,支持从Bioyond系统同步物料到UniLab工作站
|
||
"""
|
||
from typing import Dict, Any, List, Optional, Union
|
||
import json
|
||
import asyncio
|
||
from abc import ABC, abstractmethod
|
||
|
||
from pylabrobot.resources import (
|
||
Resource as PLRResource,
|
||
Container,
|
||
Deck,
|
||
Coordinate as PLRCoordinate,
|
||
)
|
||
|
||
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
|
||
from unilabos.utils.log import logger
|
||
from unilabos.resources.graphio import (
|
||
resource_plr_to_ulab,
|
||
resource_ulab_to_plr,
|
||
resource_bioyond_to_ulab,
|
||
resource_bioyond_container_to_ulab,
|
||
resource_ulab_to_bioyond
|
||
)
|
||
from .workstation_material_management import MaterialManagementBase
|
||
|
||
|
||
class BioyondMaterialManagement(MaterialManagementBase):
|
||
"""Bioyond物料管理类
|
||
|
||
实现从Bioyond系统同步物料到UniLab工作站的功能:
|
||
1. 从Bioyond系统获取物料数据
|
||
2. 转换为UniLab格式
|
||
3. 同步到PyLabRobot Deck
|
||
4. 支持双向同步
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
device_id: str,
|
||
deck_config: Dict[str, Any],
|
||
resource_tracker: DeviceNodeResourceTracker,
|
||
children_config: Dict[str, Dict[str, Any]] = None,
|
||
bioyond_config: Dict[str, Any] = None
|
||
):
|
||
self.bioyond_config = bioyond_config or {}
|
||
self.bioyond_api_client = None
|
||
self.sync_interval = self.bioyond_config.get("sync_interval", 30) # 同步间隔(秒)
|
||
|
||
# 初始化父类
|
||
super().__init__(device_id, deck_config, resource_tracker, children_config)
|
||
|
||
# 初始化Bioyond API客户端
|
||
self._initialize_bioyond_client()
|
||
|
||
# 启动同步任务
|
||
self._start_sync_task()
|
||
|
||
def _initialize_bioyond_client(self):
|
||
"""初始化Bioyond API客户端"""
|
||
try:
|
||
# 这里应该根据实际的Bioyond API实现
|
||
# 暂时使用模拟客户端
|
||
self.bioyond_api_client = BioyondAPIClient(self.bioyond_config)
|
||
logger.info(f"Bioyond API客户端初始化成功")
|
||
except Exception as e:
|
||
logger.error(f"Bioyond API客户端初始化失败: {e}")
|
||
self.bioyond_api_client = None
|
||
|
||
def _start_sync_task(self):
|
||
"""启动同步任务"""
|
||
if self.bioyond_api_client:
|
||
# 创建异步同步任务
|
||
asyncio.create_task(self._periodic_sync())
|
||
logger.info(f"Bioyond同步任务已启动,间隔: {self.sync_interval}秒")
|
||
|
||
async def _periodic_sync(self):
|
||
"""定期同步任务"""
|
||
while True:
|
||
try:
|
||
await self.sync_from_bioyond()
|
||
await asyncio.sleep(self.sync_interval)
|
||
except Exception as e:
|
||
logger.error(f"Bioyond同步任务出错: {e}")
|
||
await asyncio.sleep(self.sync_interval)
|
||
|
||
async def sync_from_bioyond(self) -> bool:
|
||
"""从Bioyond系统同步物料"""
|
||
try:
|
||
if not self.bioyond_api_client:
|
||
logger.warning("Bioyond API客户端未初始化")
|
||
return False
|
||
|
||
# 1. 从Bioyond获取物料数据
|
||
bioyond_data = await self.bioyond_api_client.get_materials()
|
||
if not bioyond_data:
|
||
logger.warning("从Bioyond获取物料数据为空")
|
||
return False
|
||
|
||
# 2. 转换为UniLab格式
|
||
if isinstance(bioyond_data, dict) and "data" in bioyond_data:
|
||
# 容器格式数据
|
||
unilab_resources = resource_bioyond_container_to_ulab(bioyond_data)
|
||
else:
|
||
# 物料列表格式数据
|
||
unilab_resources = resource_bioyond_to_ulab(bioyond_data)
|
||
|
||
# 3. 转换为PLR格式并分配到Deck
|
||
await self._assign_resources_to_deck(unilab_resources)
|
||
|
||
logger.info(f"从Bioyond同步了 {len(unilab_resources)} 个资源")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"从Bioyond同步物料失败: {e}")
|
||
return False
|
||
|
||
async def sync_to_bioyond(self, plr_resource: PLRResource) -> bool:
|
||
"""将本地物料变更同步到Bioyond系统"""
|
||
try:
|
||
if not self.bioyond_api_client:
|
||
logger.warning("Bioyond API客户端未初始化")
|
||
return False
|
||
|
||
# 1. 转换为UniLab格式
|
||
unilab_resource = resource_plr_to_ulab(plr_resource)
|
||
|
||
# 2. 转换为Bioyond格式
|
||
bioyond_materials = resource_ulab_to_bioyond([unilab_resource])
|
||
|
||
# 3. 发送到Bioyond系统
|
||
success = await self.bioyond_api_client.update_materials(bioyond_materials)
|
||
|
||
if success:
|
||
logger.info(f"成功同步物料 {plr_resource.name} 到Bioyond")
|
||
else:
|
||
logger.warning(f"同步物料 {plr_resource.name} 到Bioyond失败")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
logger.error(f"同步物料到Bioyond失败: {e}")
|
||
return False
|
||
|
||
async def _assign_resources_to_deck(self, unilab_resources: List[Dict[str, Any]]):
|
||
"""将UniLab资源分配到Deck"""
|
||
try:
|
||
# 转换为PLR格式
|
||
from unilabos.resources.graphio import list_to_nested_dict
|
||
nested_resources = list_to_nested_dict(unilab_resources)
|
||
plr_resources = resource_ulab_to_plr(nested_resources)
|
||
|
||
# 分配资源到Deck
|
||
if hasattr(plr_resources, 'children'):
|
||
resources_to_assign = plr_resources.children
|
||
elif isinstance(plr_resources, list):
|
||
resources_to_assign = plr_resources
|
||
else:
|
||
resources_to_assign = [plr_resources]
|
||
|
||
for resource in resources_to_assign:
|
||
try:
|
||
# 获取资源位置
|
||
if hasattr(resource, 'location') and resource.location:
|
||
location = PLRCoordinate(resource.location.x, resource.location.y, resource.location.z)
|
||
else:
|
||
location = PLRCoordinate(0, 0, 0)
|
||
|
||
# 分配资源到Deck
|
||
self.plr_deck.assign_child_resource(resource, location)
|
||
|
||
# 注册到resource tracker
|
||
self.resource_tracker.add_resource(resource)
|
||
|
||
# 保存资源引用
|
||
self.plr_resources[resource.name] = resource
|
||
|
||
except Exception as e:
|
||
logger.error(f"分配资源 {resource.name} 到Deck失败: {e}")
|
||
|
||
logger.info(f"成功分配了 {len(resources_to_assign)} 个资源到Deck")
|
||
|
||
except Exception as e:
|
||
logger.error(f"分配资源到Deck失败: {e}")
|
||
|
||
def _create_resource_by_type(
|
||
self,
|
||
resource_id: str,
|
||
resource_type: str,
|
||
config: Dict[str, Any],
|
||
data: Dict[str, Any],
|
||
location: PLRCoordinate
|
||
) -> Optional[PLRResource]:
|
||
"""根据类型创建Bioyond相关资源"""
|
||
try:
|
||
# 这里可以根据需要实现特定的Bioyond资源类型
|
||
# 目前使用通用的容器类型
|
||
if resource_type in ["container", "plate", "well"]:
|
||
return self._create_generic_container(resource_id, resource_type, config, data, location)
|
||
else:
|
||
logger.warning(f"未知的Bioyond资源类型: {resource_type}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建Bioyond资源失败 {resource_id} ({resource_type}): {e}")
|
||
return None
|
||
|
||
def _create_generic_container(
|
||
self,
|
||
resource_id: str,
|
||
resource_type: str,
|
||
config: Dict[str, Any],
|
||
data: Dict[str, Any],
|
||
location: PLRCoordinate
|
||
) -> Optional[PLRResource]:
|
||
"""创建通用容器资源"""
|
||
try:
|
||
from pylabrobot.resources import Plate, Well
|
||
|
||
if resource_type == "plate":
|
||
return Plate(
|
||
name=resource_id,
|
||
size_x=config.get("size_x", 127.76),
|
||
size_y=config.get("size_y", 85.48),
|
||
size_z=config.get("size_z", 14.35),
|
||
location=location,
|
||
category="plate"
|
||
)
|
||
elif resource_type == "well":
|
||
return Well(
|
||
name=resource_id,
|
||
size_x=config.get("size_x", 9.0),
|
||
size_y=config.get("size_y", 9.0),
|
||
size_z=config.get("size_z", 10.0),
|
||
location=location,
|
||
category="well"
|
||
)
|
||
else:
|
||
return Container(
|
||
name=resource_id,
|
||
size_x=config.get("size_x", 50.0),
|
||
size_y=config.get("size_y", 50.0),
|
||
size_z=config.get("size_z", 10.0),
|
||
location=location,
|
||
category="container"
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建通用容器失败 {resource_id}: {e}")
|
||
return None
|
||
|
||
def get_bioyond_materials(self) -> List[Dict[str, Any]]:
|
||
"""获取当前Bioyond物料列表"""
|
||
try:
|
||
# 将当前PLR资源转换为Bioyond格式
|
||
bioyond_materials = []
|
||
for resource in self.plr_resources.values():
|
||
unilab_resource = resource_plr_to_ulab(resource)
|
||
bioyond_materials.extend(resource_ulab_to_bioyond([unilab_resource]))
|
||
return bioyond_materials
|
||
except Exception as e:
|
||
logger.error(f"获取Bioyond物料列表失败: {e}")
|
||
return []
|
||
|
||
def update_material_from_bioyond(self, material_id: str, bioyond_data: Dict[str, Any]) -> bool:
|
||
"""从Bioyond数据更新指定物料"""
|
||
try:
|
||
# 查找现有物料
|
||
material = self.find_material_by_id(material_id)
|
||
if not material:
|
||
logger.warning(f"未找到物料: {material_id}")
|
||
return False
|
||
|
||
# 转换Bioyond数据为UniLab格式
|
||
unilab_resources = resource_bioyond_to_ulab([bioyond_data])
|
||
if not unilab_resources:
|
||
logger.warning(f"转换Bioyond数据失败: {material_id}")
|
||
return False
|
||
|
||
# 更新物料属性
|
||
unilab_resource = unilab_resources[0]
|
||
material.name = unilab_resource.get("name", material.name)
|
||
|
||
# 更新位置
|
||
position = unilab_resource.get("position", {})
|
||
if position:
|
||
material.location = PLRCoordinate(
|
||
position.get("x", 0),
|
||
position.get("y", 0),
|
||
position.get("z", 0)
|
||
)
|
||
|
||
logger.info(f"成功更新物料: {material_id}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新物料失败 {material_id}: {e}")
|
||
return False
|
||
|
||
|
||
class BioyondAPIClient:
|
||
"""Bioyond API客户端(模拟实现)
|
||
|
||
实际使用时需要根据Bioyond系统的API接口实现
|
||
"""
|
||
|
||
def __init__(self, config: Dict[str, Any]):
|
||
self.config = config
|
||
self.base_url = config.get("base_url", "http://localhost:8080")
|
||
self.api_key = config.get("api_key", "")
|
||
self.timeout = config.get("timeout", 30)
|
||
|
||
async def get_materials(self) -> Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]:
|
||
"""从Bioyond系统获取物料数据"""
|
||
try:
|
||
# 这里应该实现实际的API调用
|
||
# 暂时返回模拟数据
|
||
logger.info("从Bioyond API获取物料数据")
|
||
|
||
# 模拟API调用延迟
|
||
await asyncio.sleep(0.1)
|
||
|
||
# 返回模拟数据(实际应该从API获取)
|
||
return {
|
||
"data": [],
|
||
"code": 1,
|
||
"message": "success",
|
||
"timestamp": 1234567890
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Bioyond API调用失败: {e}")
|
||
return None
|
||
|
||
async def update_materials(self, materials: List[Dict[str, Any]]) -> bool:
|
||
"""更新Bioyond系统中的物料数据"""
|
||
try:
|
||
# 这里应该实现实际的API调用
|
||
logger.info(f"更新Bioyond系统中的 {len(materials)} 个物料")
|
||
|
||
# 模拟API调用延迟
|
||
await asyncio.sleep(0.1)
|
||
|
||
# 模拟成功响应
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新Bioyond物料失败: {e}")
|
||
return False
|
||
|
||
async def get_material_by_id(self, material_id: str) -> Optional[Dict[str, Any]]:
|
||
"""根据ID获取单个物料"""
|
||
try:
|
||
# 这里应该实现实际的API调用
|
||
logger.info(f"从Bioyond API获取物料: {material_id}")
|
||
|
||
# 模拟API调用延迟
|
||
await asyncio.sleep(0.1)
|
||
|
||
# 返回模拟数据
|
||
return {
|
||
"id": material_id,
|
||
"name": f"material_{material_id}",
|
||
"type": "container",
|
||
"quantity": 1.0,
|
||
"unit": "个"
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取Bioyond物料失败 {material_id}: {e}")
|
||
return None
|