diff --git a/unilabos/devices/workstation/README.md b/unilabos/devices/workstation/README.md new file mode 100644 index 00000000..710a2211 --- /dev/null +++ b/unilabos/devices/workstation/README.md @@ -0,0 +1,184 @@ +# 工作站抽象基类物料系统架构说明 + +## 设计理念 + +基于用户需求"请你帮我系统思考一下,工作站抽象基类的物料系统基类该如何构建",我们最终确定了一个**PyLabRobot Deck为中心**的简化架构。 + +### 核心原则 + +1. **PyLabRobot为物料管理核心**:使用PyLabRobot的Deck系统作为物料管理的基础,利用其成熟的Resource体系 +2. **Graphio转换函数集成**:使用graphio中的`resource_ulab_to_plr`等转换函数实现UniLab与PLR格式的无缝转换 +3. **关注点分离**:基类专注核心物料系统,HTTP服务等功能在子类中实现 +4. **外部系统集成模式**:通过ResourceSynchronizer抽象类提供外部物料系统对接模式 + +## 架构组成 + +### 1. WorkstationBase(基类) +**文件**: `workstation_base.py` + +**核心功能**: +- 使用deck_config和children通过`resource_ulab_to_plr`转换为PLR物料self.deck +- 基础的资源查找和管理功能 +- 抽象的工作流执行接口 +- ResourceSynchronizer集成点 + +**关键代码**: +```python +def _initialize_material_system(self, deck_config: Dict[str, Any], children_config: Dict[str, Any] = None): + """初始化基于PLR的物料系统""" + # 合并deck_config和children + complete_config = self._merge_deck_and_children_config(deck_config, children_config) + + # 使用graphio转换函数转换为PLR资源 + self.deck = resource_ulab_to_plr(complete_config) +``` + +### 2. ResourceSynchronizer(外部系统集成抽象类) +**定义在**: `workstation_base.py` + +**设计目的**: +- 提供外部物料系统(如Bioyong、LIMS等)集成的标准接口 +- 双向同步:从外部系统同步到本地deck,以及将本地变更同步到外部系统 +- 处理外部系统的变更通知 + +**核心方法**: +```python +async def sync_from_external(self) -> bool: + """从外部系统同步物料到本地deck""" + +async def sync_to_external(self, plr_resource) -> bool: + """将本地物料同步到外部系统""" + +async def handle_external_change(self, change_info: Dict[str, Any]) -> bool: + """处理外部系统的变更通知""" +``` + +### 3. WorkstationWithHTTP(子类示例) +**文件**: `workstation_with_http_example.py` + +**扩展功能**: +- HTTP报送接收服务集成 +- 具体工作流实现(液体转移、板洗等) +- Bioyong物料系统同步器示例 +- 外部报送处理方法 + +## 技术栈 + +### 核心依赖 +- **PyLabRobot**: 物料资源管理核心(Deck, Resource, Coordinate) +- **GraphIO转换函数**: UniLab ↔ PLR格式转换 + - `resource_ulab_to_plr`: UniLab格式转PLR格式 + - `resource_plr_to_ulab`: PLR格式转UniLab格式 + - `convert_resources_to_type`: 通用资源类型转换 +- **ROS2**: 基础设备节点通信(BaseROS2DeviceNode) + +### 可选依赖 +- **HTTP服务**: 仅在需要外部报送接收的子类中使用 +- **外部系统API**: 根据具体集成需求添加 + +## 使用示例 + +### 1. 简单工作站(仅PLR物料系统) + +```python +from unilabos.devices.workstation.workstation_base import WorkstationBase + +# Deck配置 +deck_config = { + "size_x": 1200.0, + "size_y": 800.0, + "size_z": 100.0 +} + +# 子资源配置 +children_config = { + "source_plate": { + "name": "source_plate", + "type": "plate", + "position": {"x": 100, "y": 100, "z": 10}, + "config": {"size_x": 127.8, "size_y": 85.5, "size_z": 14.4} + } +} + +# 创建工作站 +workstation = WorkstationBase( + device_id="simple_workstation", + deck_config=deck_config, + children_config=children_config +) + +# 查找资源 +plate = workstation.find_resource_by_name("source_plate") +``` + +### 2. 带HTTP服务的工作站 + +```python +from unilabos.devices.workstation.workstation_with_http_example import WorkstationWithHTTP + +# HTTP服务配置 +http_service_config = { + "enabled": True, + "host": "127.0.0.1", + "port": 8081 +} + +# 创建带HTTP服务的工作站 +workstation = WorkstationWithHTTP( + device_id="http_workstation", + deck_config=deck_config, + children_config=children_config, + http_service_config=http_service_config +) + +# 执行工作流 +success = workstation.execute_workflow("liquid_transfer", { + "volume": 100.0, + "source_wells": ["A1", "A2"], + "dest_wells": ["B1", "B2"] +}) +``` + +### 3. 外部系统集成 + +```python +class BioyongResourceSynchronizer(ResourceSynchronizer): + """Bioyong系统同步器""" + + async def sync_from_external(self) -> bool: + # 从Bioyong API获取物料 + external_materials = await self._fetch_bioyong_materials() + + # 转换并添加到本地deck + for material in external_materials: + await self._add_material_to_deck(material) + + return True +``` + +## 设计优势 + +### 1. **简洁性** +- 基类只专注核心物料管理,没有冗余功能 +- 使用成熟的PyLabRobot作为物料管理基础 + +### 2. **可扩展性** +- 通过子类添加HTTP服务、特定工作流等功能 +- ResourceSynchronizer模式支持任意外部系统集成 + +### 3. **标准化** +- PLR Deck提供标准的资源管理接口 +- Graphio转换函数确保格式一致性 + +### 4. **灵活性** +- 可选择性使用HTTP服务和外部系统集成 +- 支持不同类型的工作站需求 + +## 发展历程 + +1. **初始设计**: 复杂的统一物料系统,包含HTTP服务和多种功能 +2. **PyLabRobot集成**: 引入PLR Deck管理,但保留了ResourceTracker复杂性 +3. **Graphio转换**: 使用graphio转换函数简化初始化 +4. **最终简化**: 专注核心PLR物料系统,HTTP服务移至子类 + +这个架构体现了"用PyLabRobot Deck来管理物料会更好;但是要做好和外部物料系统的对接"的设计理念,以及"现在我只需要在工作站创建的时候,整体使用deck_config和children,一起通过resource_ulab_to_plr转换为plr物料self.deck即可"的简化要求。 diff --git a/unilabos/devices/workstation/workflow_executors.py b/unilabos/devices/workstation/workflow_executors.py index 93f00ae4..41a51c14 100644 --- a/unilabos/devices/workstation/workflow_executors.py +++ b/unilabos/devices/workstation/workflow_executors.py @@ -606,12 +606,12 @@ class ProxyWorkflowExecutor(WorkflowExecutor): """执行代理工作流""" try: # 通过协议节点调用目标设备的工作流 - if self.workstation._protocol_node: - return self.workstation._protocol_node.call_device_method( + if self.workstation._workstation_node: + return self.workstation._workstation_node.call_device_method( self.device_id, 'execute_workflow', workflow_name, parameters ) else: - logger.error("代理模式需要protocol_node") + logger.error("代理模式需要workstation_node") return False except Exception as e: @@ -621,12 +621,12 @@ class ProxyWorkflowExecutor(WorkflowExecutor): def stop_workflow(self, emergency: bool = False) -> bool: """停止代理工作流""" try: - if self.workstation._protocol_node: - return self.workstation._protocol_node.call_device_method( + if self.workstation._workstation_node: + return self.workstation._workstation_node.call_device_method( self.device_id, 'stop_workflow', emergency ) else: - logger.error("代理模式需要protocol_node") + logger.error("代理模式需要workstation_node") return False except Exception as e: diff --git a/unilabos/devices/workstation/workstation_base.py b/unilabos/devices/workstation/workstation_base.py index 4b3c192b..529acfd3 100644 --- a/unilabos/devices/workstation/workstation_base.py +++ b/unilabos/devices/workstation/workstation_base.py @@ -1,24 +1,24 @@ """ 工作站基类 -Workstation Base Class - 单接口模式 +Workstation Base Class - 简化版 -基于单一硬件接口的简化工作站架构 -支持直接模式和代理模式的自动工作流执行器选择 +基于PLR Deck的简化工作站架构 +专注于核心物料系统和工作流管理 """ import time -import traceback -from typing import Dict, Any, List, Optional, Union, TYPE_CHECKING +from typing import Dict, Any, List, Optional, Union from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum -if TYPE_CHECKING: - from unilabos.ros.nodes.presets.protocol_node import ROS2WorkstationNode +try: + from pylabrobot.resources import Deck, Resource as PLRResource + PYLABROBOT_AVAILABLE = True +except ImportError: + PYLABROBOT_AVAILABLE = False + class Deck: pass + class PLRResource: pass -from unilabos.devices.work_station.workstation_material_management import MaterialManagementBase -from unilabos.devices.work_station.workstation_http_service import ( - WorkstationHTTPService, WorkstationReportRequest, MaterialUsage -) from unilabos.utils.log import logger @@ -45,43 +45,69 @@ class WorkflowInfo: parameters_schema: Dict[str, Any] # 参数架构 -class WorkstationBase(ABC): - """工作站基类 - 单接口模式 +class ResourceSynchronizer(ABC): + """资源同步器基类 - 核心设计原则: - 1. 每个工作站只有一个 hardware_interface - 2. 根据接口类型自动选择工作流执行器 - 3. 支持直接模式和代理模式 - 4. 统一的设备操作接口 + 负责与外部物料系统的同步,并对 self.deck 做修改 + """ + + def __init__(self, workstation: 'WorkstationBase'): + self.workstation = workstation + + @abstractmethod + async def sync_from_external(self) -> bool: + """从外部系统同步物料到本地deck""" + pass + + @abstractmethod + async def sync_to_external(self, plr_resource: PLRResource) -> bool: + """将本地物料同步到外部系统""" + pass + + @abstractmethod + async def handle_external_change(self, change_info: Dict[str, Any]) -> bool: + """处理外部系统的变更通知""" + pass + + +class WorkstationBase(ABC): + """工作站基类 - 简化版 + + 核心功能: + 1. 基于 PLR Deck 的物料系统,支持格式转换 + 2. 可选的资源同步器支持外部物料系统 + 3. 简化的工作流管理 """ def __init__( self, device_id: str, - deck_config: Optional[Dict[str, Any]] = None, - http_service_config: Optional[Dict[str, Any]] = None, + deck_config: Dict[str, Any], + children: Optional[Dict[str, Any]] = None, + resource_synchronizer: Optional[ResourceSynchronizer] = None, *args, **kwargs, ): + if not PYLABROBOT_AVAILABLE: + raise ImportError("PyLabRobot 未安装,无法创建工作站") + # 基本配置 self.device_id = device_id - self.deck_config = deck_config or {"size_x": 1000.0, "size_y": 1000.0, "size_z": 500.0} + self.deck_config = deck_config + self.children = children or {} - # HTTP服务配置 - self.http_service_config = http_service_config or { - "enabled": True, - "host": "127.0.0.1", - "port": 8081 - } + # PLR 物料系统 + self.deck: Optional[Deck] = None + self.plr_resources: Dict[str, PLRResource] = {} - # 单一硬件接口 - 可以是具体客户端对象或代理字符串 + # 资源同步器(可选) + self.resource_synchronizer = resource_synchronizer + + # 硬件接口 self.hardware_interface: Union[Any, str] = None # 协议节点引用(用于代理模式) - self._protocol_node: Optional['ROS2WorkstationNode'] = None - - # 工作流执行器(基于通信接口类型自动选择) - self.workflow_executor: Optional['WorkflowExecutor'] = None + self._workstation_node: Optional['ROS2WorkstationNode'] = None # 工作流状态 self.current_workflow_status = WorkflowStatus.IDLE @@ -89,90 +115,136 @@ class WorkstationBase(ABC): self.workflow_start_time = None self.workflow_parameters = {} - # 错误处理 - self.error_history = [] - self.action_results = {} - # 支持的工作流(静态预定义) self.supported_workflows: Dict[str, WorkflowInfo] = {} - # 初始化工作站模块 - self.material_management: MaterialManagementBase = self._create_material_management_module() + # 初始化物料系统 + self._initialize_material_system() # 注册支持的工作流 self._register_supported_workflows() - # 启动HTTP报送接收服务 - self.http_service = None - self._start_http_service() - - logger.info(f"工作站 {device_id} 初始化完成(单接口模式)") + logger.info(f"工作站 {device_id} 初始化完成(简化版)") + def _initialize_material_system(self): + """初始化物料系统 - 使用 graphio 转换""" + try: + from unilabos.resources.graphio import resource_ulab_to_plr + + # 1. 合并 deck_config 和 children 创建完整的资源树 + complete_resource_config = self._create_complete_resource_config() + + # 2. 使用 graphio 转换为 PLR 资源 + self.deck = resource_ulab_to_plr(complete_resource_config, plr_model=True) + + # 3. 建立资源映射 + self._build_resource_mappings(self.deck) + + # 4. 如果有资源同步器,执行初始同步 + if self.resource_synchronizer: + # 这里可以异步执行,暂时跳过 + pass + + logger.info(f"工作站 {self.device_id} 物料系统初始化成功,创建了 {len(self.plr_resources)} 个资源") + + except Exception as e: + logger.error(f"工作站 {self.device_id} 物料系统初始化失败: {e}") + raise + + def _create_complete_resource_config(self) -> Dict[str, Any]: + """创建完整的资源配置 - 合并 deck_config 和 children""" + # 创建主 deck 配置 + deck_resource = { + "id": f"{self.device_id}_deck", + "name": f"{self.device_id}_deck", + "type": "deck", + "position": {"x": 0, "y": 0, "z": 0}, + "config": { + "size_x": self.deck_config.get("size_x", 1000.0), + "size_y": self.deck_config.get("size_y", 1000.0), + "size_z": self.deck_config.get("size_z", 100.0), + **{k: v for k, v in self.deck_config.items() if k not in ["size_x", "size_y", "size_z"]} + }, + "data": {}, + "children": [], + "parent": None + } + + # 添加子资源 + if self.children: + children_list = [] + for child_id, child_config in self.children.items(): + child_resource = self._normalize_child_resource(child_id, child_config, deck_resource["id"]) + children_list.append(child_resource) + deck_resource["children"] = children_list + + return deck_resource + + def _normalize_child_resource(self, resource_id: str, config: Dict[str, Any], parent_id: str) -> Dict[str, Any]: + """标准化子资源配置""" + return { + "id": resource_id, + "name": config.get("name", resource_id), + "type": config.get("type", "container"), + "position": self._normalize_position(config.get("position", {})), + "config": config.get("config", {}), + "data": config.get("data", {}), + "children": [], # 简化版本:只支持一层子资源 + "parent": parent_id + } + + def _normalize_position(self, position: Any) -> Dict[str, float]: + """标准化位置信息""" + if isinstance(position, dict): + return { + "x": float(position.get("x", 0)), + "y": float(position.get("y", 0)), + "z": float(position.get("z", 0)) + } + elif isinstance(position, (list, tuple)) and len(position) >= 2: + return { + "x": float(position[0]), + "y": float(position[1]), + "z": float(position[2]) if len(position) > 2 else 0.0 + } + else: + return {"x": 0.0, "y": 0.0, "z": 0.0} + + def _build_resource_mappings(self, deck: Deck): + """递归构建资源映射""" + def add_resource_recursive(resource: PLRResource): + if hasattr(resource, 'name'): + self.plr_resources[resource.name] = resource + + if hasattr(resource, 'children'): + for child in resource.children: + add_resource_recursive(child) + + add_resource_recursive(deck) + + # ============ 硬件接口管理 ============ + def set_hardware_interface(self, hardware_interface: Union[Any, str]): """设置硬件接口""" self.hardware_interface = hardware_interface - - # 根据接口类型自动创建工作流执行器 - self._setup_workflow_executor() - logger.info(f"工作站 {self.device_id} 硬件接口设置: {type(hardware_interface).__name__}") - def set_protocol_node(self, protocol_node: 'ROS2WorkstationNode'): + def set_workstation_node(self, workstation_node: 'ROS2WorkstationNode'): """设置协议节点引用(用于代理模式)""" - self._protocol_node = protocol_node + self._workstation_node = workstation_node logger.info(f"工作站 {self.device_id} 关联协议节点") - def _setup_workflow_executor(self): - """根据硬件接口类型自动设置工作流执行器""" - if self.hardware_interface is None: - return - - # 动态导入工作流执行器类 - try: - from unilabos.devices.work_station.workflow_executors import ( - ProxyWorkflowExecutor, ModbusWorkflowExecutor, - HttpWorkflowExecutor, PyLabRobotWorkflowExecutor - ) - except ImportError: - logger.warning("工作流执行器模块未找到,将使用基础执行器") - self.workflow_executor = None - return - - # 检查是否为代理字符串 - if isinstance(self.hardware_interface, str) and self.hardware_interface.startswith("proxy:"): - self.workflow_executor = ProxyWorkflowExecutor(self) - logger.info(f"工作站 {self.device_id} 使用代理工作流执行器") - - # 检查是否为Modbus客户端 - elif hasattr(self.hardware_interface, 'write_register') and hasattr(self.hardware_interface, 'read_register'): - self.workflow_executor = ModbusWorkflowExecutor(self) - logger.info(f"工作站 {self.device_id} 使用Modbus工作流执行器") - - # 检查是否为HTTP客户端 - elif hasattr(self.hardware_interface, 'post') or hasattr(self.hardware_interface, 'get'): - self.workflow_executor = HttpWorkflowExecutor(self) - logger.info(f"工作站 {self.device_id} 使用HTTP工作流执行器") - - # 检查是否为PyLabRobot设备 - elif hasattr(self.hardware_interface, 'transfer_liquid') or hasattr(self.hardware_interface, 'pickup_tips'): - self.workflow_executor = PyLabRobotWorkflowExecutor(self) - logger.info(f"工作站 {self.device_id} 使用PyLabRobot工作流执行器") - - else: - logger.warning(f"工作站 {self.device_id} 无法识别硬件接口类型: {type(self.hardware_interface)}") - self.workflow_executor = None - - # ============ 统一的设备操作接口 ============ + # ============ 设备操作接口 ============ def call_device_method(self, method: str, *args, **kwargs) -> Any: """调用设备方法的统一接口""" # 1. 代理模式:通过协议节点转发 if isinstance(self.hardware_interface, str) and self.hardware_interface.startswith("proxy:"): - if not self._protocol_node: - raise RuntimeError("代理模式需要设置protocol_node") + if not self._workstation_node: + raise RuntimeError("代理模式需要设置workstation_node") device_id = self.hardware_interface[6:] # 移除 "proxy:" 前缀 - return self._protocol_node.call_device_method(device_id, method, *args, **kwargs) + return self._workstation_node.call_device_method(device_id, method, *args, **kwargs) # 2. 直接模式:直接调用硬件接口方法 elif self.hardware_interface and hasattr(self.hardware_interface, method): @@ -201,22 +273,54 @@ class WorkstationBase(ABC): except: return False - # ============ 工作流控制接口 ============ + # ============ 物料系统接口 ============ + + def get_deck(self) -> Deck: + """获取主 Deck""" + return self.deck + + def get_all_resources(self) -> Dict[str, PLRResource]: + """获取所有 PLR 资源""" + return self.plr_resources.copy() + + def find_resource_by_name(self, name: str) -> Optional[PLRResource]: + """按名称查找资源""" + return self.plr_resources.get(name) + + def find_resources_by_type(self, resource_type: type) -> List[PLRResource]: + """按类型查找资源""" + return [res for res in self.plr_resources.values() + if isinstance(res, resource_type)] + + async def sync_with_external_system(self) -> bool: + """与外部物料系统同步""" + if not self.resource_synchronizer: + logger.info(f"工作站 {self.device_id} 没有配置资源同步器") + return True + + try: + success = await self.resource_synchronizer.sync_from_external() + if success: + logger.info(f"工作站 {self.device_id} 外部同步成功") + else: + logger.warning(f"工作站 {self.device_id} 外部同步失败") + return success + except Exception as e: + logger.error(f"工作站 {self.device_id} 外部同步异常: {e}") + return False + + # ============ 简化的工作流控制 ============ def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: - """执行工作流 - 委托给工作流执行器""" - if not self.workflow_executor: - logger.error(f"工作站 {self.device_id} 工作流执行器未初始化") - return False - + """执行工作流""" try: # 设置工作流状态 self.current_workflow_status = WorkflowStatus.INITIALIZING self.workflow_parameters = parameters self.workflow_start_time = time.time() - # 委托给工作流执行器 - success = self.workflow_executor.execute_workflow(workflow_name, parameters) + # 委托给子类实现 + success = self._execute_workflow_impl(workflow_name, parameters) if success: self.current_workflow_status = WorkflowStatus.RUNNING @@ -232,16 +336,8 @@ class WorkstationBase(ABC): logger.error(f"工作站 {self.device_id} 执行工作流失败: {e}") return False - def start_workflow(self, workflow_type: str, parameters: Dict[str, Any] = None) -> bool: - """启动工作流 - 兼容旧接口""" - return self.execute_workflow(workflow_type, parameters or {}) - def stop_workflow(self, emergency: bool = False) -> bool: """停止工作流""" - if not self.workflow_executor: - logger.warning(f"工作站 {self.device_id} 工作流执行器未初始化") - return True - try: if self.current_workflow_status in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]: logger.warning(f"工作站 {self.device_id} 没有正在运行的工作流") @@ -249,8 +345,8 @@ class WorkstationBase(ABC): self.current_workflow_status = WorkflowStatus.STOPPING - # 委托给工作流执行器 - success = self.workflow_executor.stop_workflow(emergency) + # 委托给子类实现 + success = self._stop_workflow_impl(emergency) if success: self.current_workflow_status = WorkflowStatus.STOPPED @@ -289,179 +385,19 @@ class WorkstationBase(ABC): return 0.0 return time.time() - self.workflow_start_time - @property - def error_count(self) -> int: - """获取错误计数""" - return len(self.error_history) - - @property - def last_error(self) -> Optional[Dict[str, Any]]: - """获取最后一个错误""" - return self.error_history[-1] if self.error_history else None - # ============ 抽象方法 - 子类必须实现 ============ - @abstractmethod - def _create_material_management_module(self) -> MaterialManagementBase: - """创建物料管理模块 - 子类必须实现""" - pass - @abstractmethod def _register_supported_workflows(self): """注册支持的工作流 - 子类必须实现""" pass - - # ============ HTTP服务管理 ============ - - def _start_http_service(self): - """启动HTTP报送接收服务""" - try: - if not self.http_service_config.get("enabled", True): - logger.info(f"工作站 {self.device_id} HTTP报送接收服务已禁用") - return - - host = self.http_service_config.get("host", "127.0.0.1") - port = self.http_service_config.get("port", 8081) - - self.http_service = WorkstationHTTPService( - workstation_handler=self, - host=host, - port=port - ) - - logger.info(f"工作站 {self.device_id} HTTP报送接收服务启动成功: {host}:{port}") - - except Exception as e: - logger.error(f"工作站 {self.device_id} 启动HTTP报送接收服务失败: {e}") - self.http_service = None - - def _stop_http_service(self): - """停止HTTP报送接收服务""" - try: - if self.http_service: - self.http_service.stop() - self.http_service = None - logger.info(f"工作站 {self.device_id} HTTP报送接收服务已停止") - except Exception as e: - logger.error(f"工作站 {self.device_id} 停止HTTP报送接收服务失败: {e}") - - # ============ 报送处理方法 ============ - - def process_material_change_report(self, report) -> Dict[str, Any]: - """处理物料变更报送""" - try: - logger.info(f"处理物料变更报送: {report.workstation_id} -> {report.resource_id} ({report.change_type})") - - result = { - 'processed': True, - 'resource_id': report.resource_id, - 'change_type': report.change_type, - 'timestamp': time.time() - } - - # 更新本地物料管理系统 - if hasattr(self, 'material_management'): - try: - self.material_management.sync_external_material_change(report) - except Exception as e: - logger.warning(f"同步物料变更到本地管理系统失败: {e}") - - return result - - except Exception as e: - logger.error(f"处理物料变更报送失败: {e}") - return {'processed': False, 'error': str(e)} - - def process_step_finish_report(self, request: WorkstationReportRequest) -> Dict[str, Any]: - """处理步骤完成报送(统一LIMS协议规范)""" - try: - data = request.data - logger.info(f"处理步骤完成报送: {data['orderCode']} - {data['stepName']}") - - result = { - 'processed': True, - 'order_code': data['orderCode'], - 'step_id': data['stepId'], - 'timestamp': time.time() - } - - return result - - except Exception as e: - logger.error(f"处理步骤完成报送失败: {e}") - return {'processed': False, 'error': str(e)} - - def process_sample_finish_report(self, request: WorkstationReportRequest) -> Dict[str, Any]: - """处理样品完成报送""" - try: - data = request.data - logger.info(f"处理样品完成报送: {data['sampleId']}") - - result = { - 'processed': True, - 'sample_id': data['sampleId'], - 'timestamp': time.time() - } - - return result - - except Exception as e: - logger.error(f"处理样品完成报送失败: {e}") - return {'processed': False, 'error': str(e)} - - def process_order_finish_report(self, request: WorkstationReportRequest, used_materials: List[MaterialUsage]) -> Dict[str, Any]: - """处理订单完成报送""" - try: - data = request.data - logger.info(f"处理订单完成报送: {data['orderCode']}") - - result = { - 'processed': True, - 'order_code': data['orderCode'], - 'used_materials': len(used_materials), - 'timestamp': time.time() - } - - return result - - except Exception as e: - logger.error(f"处理订单完成报送失败: {e}") - return {'processed': False, 'error': str(e)} - - def handle_external_error(self, error_request): - """处理外部错误报告""" - try: - logger.error(f"收到外部错误报告: {error_request}") - - # 记录错误 - error_record = { - 'timestamp': time.time(), - 'error_type': error_request.get('error_type', 'unknown'), - 'error_message': error_request.get('message', ''), - 'source': error_request.get('source', 'external'), - 'context': error_request.get('context', {}) - } - - self.error_history.append(error_record) - - # 处理紧急停止情况 - if error_request.get('emergency_stop', False): - self._trigger_emergency_stop(error_record['error_message']) - - return {'processed': True, 'error_id': len(self.error_history)} - - except Exception as e: - logger.error(f"处理外部错误失败: {e}") - return {'processed': False, 'error': str(e)} - - def _trigger_emergency_stop(self, reason: str): - """触发紧急停止""" - logger.critical(f"触发紧急停止: {reason}") - self.stop_workflow(emergency=True) - - def __del__(self): - """清理资源""" - try: - self._stop_http_service() - except: - pass + + @abstractmethod + def _execute_workflow_impl(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行工作流的具体实现 - 子类必须实现""" + pass + + @abstractmethod + def _stop_workflow_impl(self, emergency: bool = False) -> bool: + """停止工作流的具体实现 - 子类必须实现""" + pass