From a632fd495ecfc151b918aa43e3000e55e6c51166 Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Thu, 25 Sep 2025 20:56:29 +0800 Subject: [PATCH] bioyond station with communication init and resource sync --- .../workstation/bioyond_studio/__init__.py | 0 .../workstation/bioyond_studio/station.py | 114 ++++++++++++++---- .../devices/workstation/workstation_base.py | 61 ++-------- 3 files changed, 101 insertions(+), 74 deletions(-) create mode 100644 unilabos/devices/workstation/bioyond_studio/__init__.py diff --git a/unilabos/devices/workstation/bioyond_studio/__init__.py b/unilabos/devices/workstation/bioyond_studio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/unilabos/devices/workstation/bioyond_studio/station.py b/unilabos/devices/workstation/bioyond_studio/station.py index 6152a4eb..bb701576 100644 --- a/unilabos/devices/workstation/bioyond_studio/station.py +++ b/unilabos/devices/workstation/bioyond_studio/station.py @@ -8,10 +8,89 @@ from typing import Dict, Any, List, Optional, Union import json from unilabos.devices.workstation.workstation_base import WorkstationBase, ResourceSynchronizer -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondV1RPC from unilabos.utils.log import logger from unilabos.resources.graphio import resource_bioyond_to_plr +from .config import API_CONFIG, WORKFLOW_MAPPINGS + + +class BioyondResourceSynchronizer(ResourceSynchronizer): + """Bioyond资源同步器 + + 负责与Bioyond系统进行物料数据的同步 + """ + + def __init__(self, workstation: 'BioyondWorkstation'): + super().__init__(workstation) + self.bioyond_api_client = None + self.sync_interval = 60 # 默认60秒同步一次 + self.last_sync_time = 0 + + def initialize(self) -> bool: + """初始化Bioyond资源同步器""" + try: + self.bioyond_api_client = self.workstation.hardware_interface + if self.bioyond_api_client is None: + logger.error("Bioyond API客户端未初始化") + return False + + # 设置同步间隔 + self.sync_interval = self.workstation.bioyond_config.get("sync_interval", 60) + + logger.info("Bioyond资源同步器初始化完成") + return True + except Exception as e: + logger.error(f"Bioyond资源同步器初始化失败: {e}") + return False + + def sync_from_external(self) -> bool: + """从Bioyond系统同步物料数据""" + try: + if self.bioyond_api_client is None: + logger.error("Bioyond API客户端未初始化") + return False + + bioyond_data = self.bioyond_api_client.fetch_materials() + if not bioyond_data: + logger.warning("从Bioyond获取的物料数据为空") + return False + + # 转换为UniLab格式 + unilab_resources = resource_bioyond_to_plr(bioyond_data, deck=self.workstation.deck) + + logger.info(f"从Bioyond同步了 {len(unilab_resources)} 个资源") + return True + except Exception as e: + logger.error(f"从Bioyond同步物料数据失败: {e}") + return False + + def sync_to_external(self, resource: Any) -> bool: + """将本地物料数据变更同步到Bioyond系统""" + try: + if self.bioyond_api_client is None: + logger.error("Bioyond API客户端未初始化") + return False + + # 调用入库、出库操作 + # bioyond_format_data = self._convert_resource_to_bioyond_format(resource) + # success = await self.bioyond_api_client.update_material(bioyond_format_data) + # + # if success + except: + pass + + def handle_external_change(self, change_info: Dict[str, Any]) -> bool: + """处理Bioyond系统的变更通知""" + try: + # 这里可以实现对Bioyond变更的处理逻辑 + logger.info(f"处理Bioyond变更通知: {change_info}") + + return True + except Exception as e: + logger.error(f"处理Bioyond变更通知失败: {e}") + return False + class BioyondWorkstation(WorkstationBase): """Bioyond工作站 @@ -26,41 +105,30 @@ class BioyondWorkstation(WorkstationBase): *args, **kwargs, ): - # 设置Bioyond配置 - self.bioyond_config = bioyond_config or { - "base_url": "http://localhost:8080", - "api_key": "", - "sync_interval": 30, - "timeout": 30 - } - - # 设置默认deck配置 - + self._create_communication_module(bioyond_config) + # 初始化父类 super().__init__( - #桌子 + # 桌子 deck=deck, *args, **kwargs, ) + self.resource_synchronizer = BioyondResourceSynchronizer(self) + self.resource_synchronizer.sync_from_external() # TODO: self._ros_node里面拿属性 logger.info(f"Bioyond工作站初始化完成") - def _create_communication_module(self): + def _create_communication_module(self, config: Optional[Dict[str, Any]] = None) -> None: """创建Bioyond通信模块""" - # 暂时返回None,因为工作站基类没有强制要求通信模块 + self.bioyond_config = config or { + **API_CONFIG, + "workflow_mappings": WORKFLOW_MAPPINGS + } + self.hardware_interface = BioyondV1RPC(self.bioyond_config) return None - def _create_material_management_module(self) -> BioyondMaterialManagement: - """创建Bioyond物料管理模块""" - # 获取必要的属性,如果不存在则使用默认值 - device_id = getattr(self, 'device_id', 'bioyond_workstation') - resource_tracker = getattr(self, 'resource_tracker', None) - children_config = getattr(self, '_children', {}) - - - def _register_supported_workflows(self): """注册Bioyond支持的工作流""" from unilabos.devices.workstation.workstation_base import WorkflowInfo diff --git a/unilabos/devices/workstation/workstation_base.py b/unilabos/devices/workstation/workstation_base.py index 62b5e657..1988249f 100644 --- a/unilabos/devices/workstation/workstation_base.py +++ b/unilabos/devices/workstation/workstation_base.py @@ -112,17 +112,17 @@ class ResourceSynchronizer(ABC): self.workstation = workstation @abstractmethod - async def sync_from_external(self) -> bool: + def sync_from_external(self) -> bool: """从外部系统同步物料到本地deck""" pass @abstractmethod - async def sync_to_external(self, plr_resource: PLRResource) -> bool: + def sync_to_external(self, plr_resource: PLRResource) -> bool: """将本地物料同步到外部系统""" pass @abstractmethod - async def handle_external_change(self, change_info: Dict[str, Any]) -> bool: + def handle_external_change(self, change_info: Dict[str, Any]) -> bool: """处理外部系统的变更通知""" pass @@ -147,17 +147,15 @@ class WorkstationBase(ABC): def __init__( self, - deck: PLRResource, + deck: Deck, *args, **kwargs, # 必须有kwargs ): - # 基本配置 - print(deck) - self.deck_config = deck - # PLR 物料系统 - self.deck: Optional[Deck] = None + self.deck: Optional[Deck] = deck self.plr_resources: Dict[str, PLRResource] = {} + + self.resource_synchronizer = None # type: Optional[ResourceSynchronizer] # 硬件接口 self.hardware_interface: Union[Any, str] = None @@ -173,46 +171,7 @@ class WorkstationBase(ABC): def post_init(self, ros_node: ROS2WorkstationNode) -> None: # 初始化物料系统 self._ros_node = ros_node - self._initialize_material_system() - - def _initialize_material_system(self): - """初始化物料系统 - 使用 graphio 转换""" - pass - - def _create_complete_resource_config(self) -> Dict[str, Any]: - """创建完整的资源配置 - 合并 deck_config 和 children""" - # 创建主 deck 配置 - return {} - - def _normalize_child_resource(self, resource_id: str, config: Dict[str, Any], parent_id: str) -> Dict[str, Any]: - """标准化子资源配置""" - return { - "id": resource_id, - "name": config.get("name", resource_id), - "type": config.get("type", "container"), - "position": self._normalize_position(config.get("position", {})), - "config": config.get("config", {}), - "data": config.get("data", {}), - "children": [], # 简化版本:只支持一层子资源 - "parent": parent_id, - } - - def _normalize_position(self, position: Any) -> Dict[str, float]: - """标准化位置信息""" - if isinstance(position, dict): - return { - "x": float(position.get("x", 0)), - "y": float(position.get("y", 0)), - "z": float(position.get("z", 0)), - } - elif isinstance(position, (list, tuple)) and len(position) >= 2: - return { - "x": float(position[0]), - "y": float(position[1]), - "z": float(position[2]) if len(position) > 2 else 0.0, - } - else: - return {"x": 0.0, "y": 0.0, "z": 0.0} + self._ros_node.update_resource([self.deck]) def _build_resource_mappings(self, deck: Deck): """递归构建资源映射""" @@ -296,14 +255,14 @@ class WorkstationBase(ABC): """按类型查找资源""" return [res for res in self.plr_resources.values() if isinstance(res, resource_type)] - async def sync_with_external_system(self) -> bool: + def sync_with_external_system(self) -> bool: """与外部物料系统同步""" if not self.resource_synchronizer: logger.info(f"工作站 {self._ros_node.device_id} 没有配置资源同步器") return True try: - success = await self.resource_synchronizer.sync_from_external() + success = self.resource_synchronizer.sync_from_external() if success: logger.info(f"工作站 {self._ros_node.device_id} 外部同步成功") else: