Files
Uni-Lab-OS/unilabos/devices/workstation/bioyond_studio/station.py

402 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bioyond工作站实现
Bioyond Workstation Implementation
集成Bioyond物料管理的工作站示例
"""
from typing import Dict, Any, List, Optional, Union
import json
from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC
from unilabos.utils.log import logger
from unilabos.resources.graphio import resource_bioyond_to_plr
from .config import API_CONFIG, WORKFLOW_MAPPINGS
class BioyondResourceSynchronizer(ResourceSynchronizer):
"""Bioyond资源同步器
负责与Bioyond系统进行物料数据的同步
"""
def __init__(self, workstation: 'BioyondWorkstation'):
super().__init__(workstation)
self.bioyond_api_client = None
self.sync_interval = 60 # 默认60秒同步一次
self.last_sync_time = 0
def initialize(self) -> bool:
"""初始化Bioyond资源同步器"""
try:
self.bioyond_api_client = self.workstation.hardware_interface
if self.bioyond_api_client is None:
logger.error("Bioyond API客户端未初始化")
return False
# 设置同步间隔
self.sync_interval = self.workstation.bioyond_config.get("sync_interval", 60)
logger.info("Bioyond资源同步器初始化完成")
return True
except Exception as e:
logger.error(f"Bioyond资源同步器初始化失败: {e}")
return False
def sync_from_external(self) -> bool:
"""从Bioyond系统同步物料数据"""
try:
if self.bioyond_api_client is None:
logger.error("Bioyond API客户端未初始化")
return False
bioyond_data = self.bioyond_api_client.fetch_materials()
if not bioyond_data:
logger.warning("从Bioyond获取的物料数据为空")
return False
# 转换为UniLab格式
unilab_resources = resource_bioyond_to_plr(bioyond_data, deck=self.workstation.deck)
logger.info(f"从Bioyond同步了 {len(unilab_resources)} 个资源")
return True
except Exception as e:
logger.error(f"从Bioyond同步物料数据失败: {e}")
return False
def sync_to_external(self, resource: Any) -> bool:
"""将本地物料数据变更同步到Bioyond系统"""
try:
if self.bioyond_api_client is None:
logger.error("Bioyond API客户端未初始化")
return False
# 调用入库、出库操作
# bioyond_format_data = self._convert_resource_to_bioyond_format(resource)
# success = await self.bioyond_api_client.update_material(bioyond_format_data)
#
# if success
except:
pass
def handle_external_change(self, change_info: Dict[str, Any]) -> bool:
"""处理Bioyond系统的变更通知"""
try:
# 这里可以实现对Bioyond变更的处理逻辑
logger.info(f"处理Bioyond变更通知: {change_info}")
return True
except Exception as e:
logger.error(f"处理Bioyond变更通知失败: {e}")
return False
class BioyondWorkstation(WorkstationBase):
"""Bioyond工作站
集成Bioyond物料管理的工作站实现
"""
def __init__(
self,
bioyond_config: Optional[Dict[str, Any]] = None,
deck: Optional[str, Any] = None,
*args,
**kwargs,
):
self._create_communication_module(bioyond_config)
# 初始化父类
super().__init__(
# 桌子
deck=deck,
*args,
**kwargs,
)
self.resource_synchronizer = BioyondResourceSynchronizer(self)
self.resource_synchronizer.sync_from_external()
# TODO: self._ros_node里面拿属性
logger.info(f"Bioyond工作站初始化完成")
def _create_communication_module(self, config: Optional[Dict[str, Any]] = None) -> None:
"""创建Bioyond通信模块"""
self.bioyond_config = config or {
**API_CONFIG,
"workflow_mappings": WORKFLOW_MAPPINGS
}
self.hardware_interface = BioyondV1RPC(self.bioyond_config)
return None
def _register_supported_workflows(self):
"""注册Bioyond支持的工作流"""
from unilabos.devices.workstation.workstation_base import WorkflowInfo
# Bioyond物料同步工作流
self.supported_workflows["bioyond_sync"] = WorkflowInfo(
name="bioyond_sync",
description="从Bioyond系统同步物料",
parameters={
"sync_type": {"type": "string", "default": "full", "options": ["full", "incremental"]},
"force_sync": {"type": "boolean", "default": False}
}
)
# Bioyond物料更新工作流
self.supported_workflows["bioyond_update"] = WorkflowInfo(
name="bioyond_update",
description="将本地物料变更同步到Bioyond",
parameters={
"material_ids": {"type": "list", "default": []},
"sync_all": {"type": "boolean", "default": True}
}
)
logger.info(f"注册了 {len(self.supported_workflows)} 个Bioyond工作流")
async def execute_bioyond_sync_workflow(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""执行Bioyond同步工作流"""
try:
sync_type = parameters.get("sync_type", "full")
force_sync = parameters.get("force_sync", False)
logger.info(f"开始执行Bioyond同步工作流: {sync_type}")
# 获取物料管理模块
material_manager = self.material_management
if sync_type == "full":
# 全量同步
success = await material_manager.sync_from_bioyond()
else:
# 增量同步(这里可以实现增量同步逻辑)
success = await material_manager.sync_from_bioyond()
if success:
result = {
"status": "success",
"message": f"Bioyond同步完成: {sync_type}",
"synced_resources": len(material_manager.plr_resources)
}
else:
result = {
"status": "failed",
"message": "Bioyond同步失败"
}
logger.info(f"Bioyond同步工作流执行完成: {result['status']}")
return result
except Exception as e:
logger.error(f"Bioyond同步工作流执行失败: {e}")
return {
"status": "error",
"message": str(e)
}
async def execute_bioyond_update_workflow(self, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""执行Bioyond更新工作流"""
try:
material_ids = parameters.get("material_ids", [])
sync_all = parameters.get("sync_all", True)
logger.info(f"开始执行Bioyond更新工作流: sync_all={sync_all}")
# 获取物料管理模块
material_manager = self.material_management
if sync_all:
# 同步所有物料
success_count = 0
for resource in material_manager.plr_resources.values():
success = await material_manager.sync_to_bioyond(resource)
if success:
success_count += 1
else:
# 同步指定物料
success_count = 0
for material_id in material_ids:
resource = material_manager.find_material_by_id(material_id)
if resource:
success = await material_manager.sync_to_bioyond(resource)
if success:
success_count += 1
result = {
"status": "success",
"message": f"Bioyond更新完成",
"updated_resources": success_count,
"total_resources": len(material_ids) if not sync_all else len(material_manager.plr_resources)
}
logger.info(f"Bioyond更新工作流执行完成: {result['status']}")
return result
except Exception as e:
logger.error(f"Bioyond更新工作流执行失败: {e}")
return {
"status": "error",
"message": str(e)
}
def get_bioyond_status(self) -> Dict[str, Any]:
"""获取Bioyond系统状态"""
try:
material_manager = self.material_management
return {
"bioyond_connected": material_manager.bioyond_api_client is not None,
"sync_interval": material_manager.sync_interval,
"total_resources": len(material_manager.plr_resources),
"deck_size": {
"x": material_manager.plr_deck.size_x,
"y": material_manager.plr_deck.size_y,
"z": material_manager.plr_deck.size_z
},
"bioyond_config": self.bioyond_config
}
except Exception as e:
logger.error(f"获取Bioyond状态失败: {e}")
return {
"error": str(e)
}
def load_bioyond_data_from_file(self, file_path: str) -> bool:
"""从文件加载Bioyond数据用于测试"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
bioyond_data = json.load(f)
# 获取物料管理模块
material_manager = self.material_management
# 转换为UniLab格式
if isinstance(bioyond_data, dict) and "data" in bioyond_data:
unilab_resources = material_manager.resource_bioyond_container_to_ulab(bioyond_data)
else:
unilab_resources = material_manager.resource_bioyond_to_ulab(bioyond_data)
# 分配到Deck
import asyncio
asyncio.create_task(material_manager._assign_resources_to_deck(unilab_resources))
logger.info(f"从文件 {file_path} 加载了 {len(unilab_resources)} 个Bioyond资源")
return True
except Exception as e:
logger.error(f"从文件加载Bioyond数据失败: {e}")
return False
# 使用示例
def create_bioyond_workstation_example():
"""创建Bioyond工作站示例"""
# 配置参数
device_id = "bioyond_workstation_001"
# 子资源配置
children = {
"plate_1": {
"name": "plate_1",
"type": "plate",
"position": {"x": 100, "y": 100, "z": 0},
"config": {
"size_x": 127.76,
"size_y": 85.48,
"size_z": 14.35,
"model": "Generic 96 Well Plate"
}
}
}
# Bioyond配置
bioyond_config = {
"base_url": "http://bioyond.example.com/api",
"api_key": "your_api_key_here",
"sync_interval": 60, # 60秒同步一次
"timeout": 30
}
# Deck配置
deck_config = {
"size_x": 1000.0,
"size_y": 1000.0,
"size_z": 100.0,
"model": "BioyondDeck"
}
# 创建工作站
workstation = BioyondWorkstation(
station_resource=deck_config,
bioyond_config=bioyond_config,
deck_config=deck_config,
)
return workstation
if __name__ == "__main__":
# 创建示例工作站
#workstation = create_bioyond_workstation_example()
# 从文件加载测试数据
#workstation.load_bioyond_data_from_file("bioyond_test_yibin.json")
# 获取状态
#status = workstation.get_bioyond_status()
#print("Bioyond工作站状态:", status)
# 创建测试数据 - 使用resource_bioyond_container_to_ulab函数期望的格式
# 读取 bioyond_resources_unilab_output3 copy.json 文件
from unilabos.resources.graphio import resource_ulab_to_plr, convert_resources_to_type
from Bioyond_wuliao import *
from typing import List
from pylabrobot.resources import Resource as PLRResource
import json
from pylabrobot.resources.deck import Deck
from pylabrobot.resources.coordinate import Coordinate
with open("./bioyond_test_yibin3_unilab_result_corr.json", "r", encoding="utf-8") as f:
bioyond_resources_unilab = json.load(f)
print(f"成功读取 JSON 文件,包含 {len(bioyond_resources_unilab)} 个资源")
ulab_resources = convert_resources_to_type(bioyond_resources_unilab, List[PLRResource])
print(f"转换结果类型: {type(ulab_resources)}")
print(f"转换结果长度: {len(ulab_resources) if ulab_resources else 0}")
deck = Deck(size_x=2000,
size_y=653.5,
size_z=900)
Stack0 = Stack(name="Stack0", location=Coordinate(0, 100, 0))
Stack1 = Stack(name="Stack1", location=Coordinate(100, 100, 0))
Stack2 = Stack(name="Stack2", location=Coordinate(200, 100, 0))
Stack3 = Stack(name="Stack3", location=Coordinate(300, 100, 0))
Stack4 = Stack(name="Stack4", location=Coordinate(400, 100, 0))
Stack5 = Stack(name="Stack5", location=Coordinate(500, 100, 0))
deck.assign_child_resource(Stack1, Stack1.location)
deck.assign_child_resource(Stack2, Stack2.location)
deck.assign_child_resource(Stack3, Stack3.location)
deck.assign_child_resource(Stack4, Stack4.location)
deck.assign_child_resource(Stack5, Stack5.location)
Stack0.assign_child_resource(ulab_resources[0], Stack0.location)
Stack1.assign_child_resource(ulab_resources[1], Stack1.location)
Stack2.assign_child_resource(ulab_resources[2], Stack2.location)
Stack3.assign_child_resource(ulab_resources[3], Stack3.location)
Stack4.assign_child_resource(ulab_resources[4], Stack4.location)
Stack5.assign_child_resource(ulab_resources[5], Stack5.location)
from unilabos.resources.graphio import convert_resources_from_type
from unilabos.app.web.client import http_client
resources = convert_resources_from_type([deck], [PLRResource])
print(resources)
http_client.remote_addr = "https://uni-lab.bohrium.com/api/v1"
#http_client.auth = "9F05593C"
http_client.auth = "ED634D1C"
http_client.resource_add(resources, database_process_later=False)