From 227ff1284abe1f8f372064b393d09cda64032377 Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Tue, 19 Aug 2025 21:35:27 +0800 Subject: [PATCH] add workstation template and battery example --- .../coin_cell_assembly_workstation.py | 454 ++++++ unilabos/device_comms/workstation_base.py | 1302 +++++++++++++++++ .../device_comms/workstation_communication.py | 600 ++++++++ .../workstation_material_management.py | 583 ++++++++ 4 files changed, 2939 insertions(+) create mode 100644 unilabos/device_comms/coin_cell_assembly_workstation.py create mode 100644 unilabos/device_comms/workstation_base.py create mode 100644 unilabos/device_comms/workstation_communication.py create mode 100644 unilabos/device_comms/workstation_material_management.py diff --git a/unilabos/device_comms/coin_cell_assembly_workstation.py b/unilabos/device_comms/coin_cell_assembly_workstation.py new file mode 100644 index 00000000..62d9b09c --- /dev/null +++ b/unilabos/device_comms/coin_cell_assembly_workstation.py @@ -0,0 +1,454 @@ +""" +纽扣电池组装工作站 +Coin Cell Assembly Workstation + +继承工作站基类,实现纽扣电池特定功能 +""" +from typing import Dict, Any, List, Optional, Union + +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.device_comms.workstation_base import WorkstationBase, WorkflowInfo +from unilabos.device_comms.workstation_communication import ( + WorkstationCommunicationBase, CommunicationConfig, CommunicationProtocol, CoinCellCommunication +) +from unilabos.device_comms.workstation_material_management import ( + MaterialManagementBase, CoinCellMaterialManagement +) +from unilabos.utils.log import logger + + +class CoinCellAssemblyWorkstation(WorkstationBase): + """纽扣电池组装工作站 + + 基于工作站基类,实现纽扣电池制造的特定功能: + 1. 纽扣电池特定的通信协议 + 2. 纽扣电池物料管理(料板、极片、电池等) + 3. 电池制造工作流 + 4. 质量检查工作流 + """ + + def __init__( + self, + device_id: str, + children: Dict[str, Dict[str, Any]], + protocol_type: Union[str, List[str]] = "BatteryManufacturingProtocol", + resource_tracker: Optional[DeviceNodeResourceTracker] = None, + modbus_config: Optional[Dict[str, Any]] = None, + deck_config: Optional[Dict[str, Any]] = None, + csv_path: str = "./coin_cell_assembly.csv", + *args, + **kwargs, + ): + # 设置通信配置 + modbus_config = modbus_config or {"host": "127.0.0.1", "port": 5021} + self.communication_config = CommunicationConfig( + protocol=CommunicationProtocol.MODBUS_TCP, + host=modbus_config["host"], + port=modbus_config["port"], + timeout=modbus_config.get("timeout", 5.0), + retry_count=modbus_config.get("retry_count", 3) + ) + + # 设置台面配置 + self.deck_config = deck_config or { + "size_x": 1620.0, + "size_y": 1270.0, + "size_z": 500.0 + } + + # CSV地址映射文件路径 + self.csv_path = csv_path + + # 创建资源跟踪器(如果没有提供) + if resource_tracker is None: + from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker + resource_tracker = DeviceNodeResourceTracker() + + # 初始化基类 + super().__init__( + device_id=device_id, + children=children, + protocol_type=protocol_type, + resource_tracker=resource_tracker, + communication_config=self.communication_config, + deck_config=self.deck_config, + *args, + **kwargs + ) + + logger.info(f"纽扣电池组装工作站 {device_id} 初始化完成") + + def _create_communication_module(self) -> WorkstationCommunicationBase: + """创建纽扣电池通信模块""" + return CoinCellCommunication( + communication_config=self.communication_config, + csv_path=self.csv_path + ) + + def _create_material_management_module(self) -> MaterialManagementBase: + """创建纽扣电池物料管理模块""" + return CoinCellMaterialManagement( + device_id=self.device_id, + deck_config=self.deck_config, + resource_tracker=self.resource_tracker, + children_config=self.children + ) + + def _register_supported_workflows(self): + """注册纽扣电池工作流""" + # 电池制造工作流 + self.supported_workflows["battery_manufacturing"] = WorkflowInfo( + name="battery_manufacturing", + description="纽扣电池制造工作流", + estimated_duration=300.0, # 5分钟 + required_materials=["cathode_sheet", "anode_sheet", "separator", "electrolyte"], + output_product="coin_cell_battery", + parameters_schema={ + "type": "object", + "properties": { + "electrolyte_num": { + "type": "integer", + "description": "电解液瓶数", + "minimum": 1, + "maximum": 32 + }, + "electrolyte_volume": { + "type": "number", + "description": "电解液体积 (μL)", + "minimum": 0.1, + "maximum": 100.0 + }, + "assembly_pressure": { + "type": "number", + "description": "组装压力 (N)", + "minimum": 100.0, + "maximum": 5000.0 + }, + "cathode_material": { + "type": "string", + "description": "正极材料类型", + "enum": ["LiFePO4", "LiCoO2", "NCM", "LMO"] + }, + "anode_material": { + "type": "string", + "description": "负极材料类型", + "enum": ["Graphite", "LTO", "Silicon"] + } + }, + "required": ["electrolyte_num", "electrolyte_volume", "assembly_pressure"] + } + ) + + # 质量检查工作流 + self.supported_workflows["quality_inspection"] = WorkflowInfo( + name="quality_inspection", + description="产品质量检查工作流", + estimated_duration=60.0, # 1分钟 + required_materials=["finished_battery"], + output_product="quality_report", + parameters_schema={ + "type": "object", + "properties": { + "test_voltage": { + "type": "boolean", + "description": "是否测试电压", + "default": True + }, + "test_capacity": { + "type": "boolean", + "description": "是否测试容量", + "default": False + }, + "voltage_threshold": { + "type": "number", + "description": "电压阈值 (V)", + "minimum": 2.0, + "maximum": 4.5, + "default": 3.0 + } + } + } + ) + + # 设备初始化工作流 + self.supported_workflows["device_initialization"] = WorkflowInfo( + name="device_initialization", + description="设备初始化工作流", + estimated_duration=30.0, # 30秒 + required_materials=[], + output_product="ready_status", + parameters_schema={ + "type": "object", + "properties": { + "auto_mode": { + "type": "boolean", + "description": "是否启用自动模式", + "default": True + } + } + } + ) + + # ============ 纽扣电池特定方法 ============ + + def get_electrode_sheet_inventory(self) -> Dict[str, int]: + """获取极片库存统计""" + try: + sheets = self.material_management.find_electrode_sheets() + inventory = {} + + for sheet in sheets: + material_type = getattr(sheet, 'material_type', 'unknown') + inventory[material_type] = inventory.get(material_type, 0) + 1 + + return inventory + + except Exception as e: + logger.error(f"获取极片库存失败: {e}") + return {} + + def get_battery_production_statistics(self) -> Dict[str, Any]: + """获取电池生产统计""" + try: + production_data = self.communication.get_production_data() + + # 添加物料统计 + electrode_inventory = self.get_electrode_sheet_inventory() + battery_count = len(self.material_management.find_batteries()) + + return { + **production_data, + "electrode_inventory": electrode_inventory, + "finished_battery_count": battery_count, + "material_plates": len(self.material_management.find_material_plates()), + "press_slots": len(self.material_management.find_press_slots()) + } + + except Exception as e: + logger.error(f"获取生产统计失败: {e}") + return {"error": str(e)} + + def create_new_battery(self, battery_spec: Dict[str, Any]) -> Optional[str]: + """创建新电池资源""" + try: + from unilabos.device_comms.button_battery_station import Battery + import uuid + + battery_id = f"battery_{uuid.uuid4().hex[:8]}" + + battery = Battery( + name=battery_id, + diameter=battery_spec.get("diameter", 20.0), + height=battery_spec.get("height", 3.2), + max_volume=battery_spec.get("max_volume", 100.0), + barcode=battery_spec.get("barcode", "") + ) + + # 添加到物料管理系统 + self.material_management.plr_resources[battery_id] = battery + self.material_management.resource_tracker.add_resource(battery) + + logger.info(f"创建新电池资源: {battery_id}") + return battery_id + + except Exception as e: + logger.error(f"创建电池资源失败: {e}") + return None + + def find_available_press_slot(self) -> Optional[str]: + """查找可用的压制槽""" + try: + press_slots = self.material_management.find_press_slots() + + for slot in press_slots: + if hasattr(slot, 'has_battery') and not slot.has_battery(): + return slot.name + + return None + + except Exception as e: + logger.error(f"查找可用压制槽失败: {e}") + return None + + def get_glove_box_environment(self) -> Dict[str, Any]: + """获取手套箱环境数据""" + try: + device_status = self.communication.get_device_status() + environment = device_status.get("environment", {}) + + return { + "pressure": environment.get("glove_box_pressure", 0.0), + "o2_content": environment.get("o2_content", 0.0), + "water_content": environment.get("water_content", 0.0), + "is_safe": ( + environment.get("o2_content", 0.0) < 10.0 and # 氧气含量 < 10ppm + environment.get("water_content", 0.0) < 1.0 # 水分含量 < 1ppm + ) + } + + except Exception as e: + logger.error(f"获取手套箱环境失败: {e}") + return {"error": str(e)} + + def start_data_export(self, file_path: str) -> bool: + """开始生产数据导出""" + try: + return self.communication.start_data_export(file_path, export_interval=5.0) + except Exception as e: + logger.error(f"启动数据导出失败: {e}") + return False + + def stop_data_export(self) -> bool: + """停止生产数据导出""" + try: + return self.communication.stop_data_export() + except Exception as e: + logger.error(f"停止数据导出失败: {e}") + return False + + # ============ 重写基类方法以支持纽扣电池特定功能 ============ + + def start_workflow(self, workflow_type: str, parameters: Dict[str, Any] = None) -> bool: + """启动工作流(重写以支持纽扣电池特定预处理)""" + try: + # 进行纽扣电池特定的预检查 + if workflow_type == "battery_manufacturing": + # 检查手套箱环境 + env = self.get_glove_box_environment() + if not env.get("is_safe", False): + logger.error("手套箱环境不安全,无法启动电池制造工作流") + return False + + # 检查是否有可用的压制槽 + available_slot = self.find_available_press_slot() + if not available_slot: + logger.error("没有可用的压制槽,无法启动电池制造工作流") + return False + + # 检查极片库存 + electrode_inventory = self.get_electrode_sheet_inventory() + if not electrode_inventory.get("cathode", 0) > 0 or not electrode_inventory.get("anode", 0) > 0: + logger.error("极片库存不足,无法启动电池制造工作流") + return False + + # 调用基类方法 + return super().start_workflow(workflow_type, parameters) + + except Exception as e: + logger.error(f"启动纽扣电池工作流失败: {e}") + return False + + # ============ 纽扣电池特定状态属性 ============ + + @property + def electrode_sheet_count(self) -> int: + """极片总数""" + try: + return len(self.material_management.find_electrode_sheets()) + except: + return 0 + + @property + def battery_count(self) -> int: + """电池总数""" + try: + return len(self.material_management.find_batteries()) + except: + return 0 + + @property + def available_press_slots(self) -> int: + """可用压制槽数""" + try: + press_slots = self.material_management.find_press_slots() + available = 0 + for slot in press_slots: + if hasattr(slot, 'has_battery') and not slot.has_battery(): + available += 1 + return available + except: + return 0 + + @property + def environment_status(self) -> Dict[str, Any]: + """环境状态""" + return self.get_glove_box_environment() + + +# ============ 工厂函数 ============ + +def create_coin_cell_workstation( + device_id: str, + config_file: str, + modbus_host: str = "127.0.0.1", + modbus_port: int = 5021, + csv_path: str = "./coin_cell_assembly.csv" +) -> CoinCellAssemblyWorkstation: + """工厂函数:创建纽扣电池组装工作站 + + Args: + device_id: 设备ID + config_file: 配置文件路径(JSON格式) + modbus_host: Modbus主机地址 + modbus_port: Modbus端口 + csv_path: 地址映射CSV文件路径 + + Returns: + CoinCellAssemblyWorkstation: 工作站实例 + """ + import json + + try: + # 加载配置文件 + with open(config_file, 'r', encoding='utf-8') as f: + config = json.load(f) + + # 提取配置 + children = config.get("children", {}) + deck_config = config.get("deck_config", {}) + + # 创建工作站 + workstation = CoinCellAssemblyWorkstation( + device_id=device_id, + children=children, + modbus_config={ + "host": modbus_host, + "port": modbus_port + }, + deck_config=deck_config, + csv_path=csv_path + ) + + logger.info(f"纽扣电池工作站创建成功: {device_id}") + return workstation + + except Exception as e: + logger.error(f"创建纽扣电池工作站失败: {e}") + raise + + +if __name__ == "__main__": + # 示例用法 + workstation = create_coin_cell_workstation( + device_id="coin_cell_station_01", + config_file="./button_battery_workstation.json", + modbus_host="127.0.0.1", + modbus_port=5021 + ) + + # 启动电池制造工作流 + success = workstation.start_workflow( + "battery_manufacturing", + { + "electrolyte_num": 16, + "electrolyte_volume": 50.0, + "assembly_pressure": 2000.0, + "cathode_material": "LiFePO4", + "anode_material": "Graphite" + } + ) + + if success: + print("电池制造工作流启动成功") + else: + print("电池制造工作流启动失败") diff --git a/unilabos/device_comms/workstation_base.py b/unilabos/device_comms/workstation_base.py new file mode 100644 index 00000000..7b61c17b --- /dev/null +++ b/unilabos/device_comms/workstation_base.py @@ -0,0 +1,1302 @@ +""" +工作站基类 +Workstation Base Class + +集成通信、物料管理和工作流的工作站基类 +融合子设备管理、动态工作流注册等高级功能 +""" +import asyncio +import json +import time +import traceback +from typing import Dict, Any, List, Optional, Union, Callable +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum + +from rclpy.action import ActionServer, ActionClient +from rclpy.action.server import ServerGoalHandle +from rclpy.callback_groups import ReentrantCallbackGroup +from rclpy.service import Service +from unilabos_msgs.srv import SerialCommand +from unilabos_msgs.msg import Resource + +from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.device_comms.workstation_communication import WorkstationCommunicationBase, CommunicationConfig +from unilabos.device_comms.workstation_material_management import MaterialManagementBase +from unilabos.ros.msgs.message_converter import convert_to_ros_msg, convert_from_ros_msg +from unilabos.utils.log import logger +from unilabos.utils.type_check import serialize_result_info + + +class DeviceType(Enum): + """设备类型枚举""" + LOGICAL = "logical" # 逻辑设备 + COMMUNICATION = "communication" # 通信设备 (modbus/opcua/serial) + PROTOCOL = "protocol" # 协议设备 + + +@dataclass +class CommunicationInterface: + """通信接口配置""" + device_id: str # 通信设备ID + read_method: str # 读取方法名 + write_method: str # 写入方法名 + protocol_type: str # 协议类型 (modbus/opcua/serial) + config: Dict[str, Any] # 协议特定配置 + + +@dataclass +class WorkflowStep: + """工作流步骤定义""" + device_id: str + action_name: str + action_kwargs: Dict[str, Any] + depends_on: Optional[List[str]] = None # 依赖的步骤ID + step_id: Optional[str] = None + timeout: Optional[float] = None + retry_count: int = 0 + + +@dataclass +class WorkflowDefinition: + """工作流定义""" + name: str + description: str + steps: List[WorkflowStep] + input_schema: Dict[str, Any] + output_schema: Dict[str, Any] + metadata: Dict[str, Any] + + +class WorkflowStatus(Enum): + """工作流状态""" + IDLE = "idle" + INITIALIZING = "initializing" + RUNNING = "running" + PAUSED = "paused" + STOPPING = "stopping" + STOPPED = "stopped" + ERROR = "error" + COMPLETED = "completed" + + +@dataclass +class WorkflowInfo: + """工作流信息""" + name: str + description: str + estimated_duration: float # 预估持续时间(秒) + required_materials: List[str] # 所需物料类型 + output_product: str # 输出产品类型 + parameters_schema: Dict[str, Any] # 参数架构 + + +class WorkstationBase(ROS2ProtocolNode, ABC): + """工作站基类 + + 提供工作站的核心功能: + 1. 通信转发 - 与PLC/设备的通信接口 + 2. 物料管理 - 基于PyLabRobot的物料系统 + 3. 工作流控制 - 支持动态注册和静态预定义工作流 + 4. 子设备管理 - 继承自ROS2ProtocolNode的设备管理能力 + 5. 状态监控 - 设备状态和生产数据监控 + 6. 调试接口 - 单点控制和紧急操作 + """ + + def __init__( + self, + device_id: str, + children: Dict[str, Dict[str, Any]], + protocol_type: Union[str, List[str]], + resource_tracker: DeviceNodeResourceTracker, + communication_config: CommunicationConfig, + deck_config: Optional[Dict[str, Any]] = None, + communication_interfaces: Optional[Dict[str, CommunicationInterface]] = None, + *args, + **kwargs, + ): + # 保存工作站特定配置 + self.communication_config = communication_config + self.deck_config = deck_config or {"size_x": 1000.0, "size_y": 1000.0, "size_z": 500.0} + self.communication_interfaces = communication_interfaces or {} + + # 工作流状态 - 支持静态和动态工作流 + self.current_workflow_status = WorkflowStatus.IDLE + self.current_workflow_info = None + self.workflow_start_time = None + self.workflow_parameters = {} + + # 支持的工作流(静态预定义) + self.supported_workflows: Dict[str, WorkflowInfo] = {} + + # 动态注册的工作流 + self.registered_workflows: Dict[str, WorkflowDefinition] = {} + self._workflow_action_servers: Dict[str, ActionServer] = {} + + # 初始化基类 - ROS2ProtocolNode会处理子设备初始化 + super().__init__( + device_id=device_id, + children=children, + protocol_type=protocol_type, + resource_tracker=resource_tracker, + *args, + **kwargs + ) + + # 工作站特有的设备分类 (基于已初始化的sub_devices) + self.communication_devices: Dict[str, Any] = {} + self.logical_devices: Dict[str, Any] = {} + self._classify_devices() + + # 初始化工作站模块 + self.communication: WorkstationCommunicationBase = self._create_communication_module() + self.material_management: MaterialManagementBase = self._create_material_management_module() + + # 设置工作站特定的通信接口 + self._setup_workstation_communication_interfaces() + + # 注册支持的工作流 + self._register_supported_workflows() + + # 创建工作站ROS服务 + self._create_workstation_services() + + # 启动状态监控 + self._start_status_monitoring() + + logger.info(f"增强工作站基类 {device_id} 初始化完成") + + def _classify_devices(self): + """基于已初始化的设备进行分类""" + for device_id, device in self.sub_devices.items(): + device_config = self.children.get(device_id, {}) + device_type = DeviceType(device_config.get("device_type", "logical")) + + if device_type == DeviceType.COMMUNICATION: + self.communication_devices[device_id] = device + logger.info(f"通信设备 {device_id} 已分类") + elif device_type == DeviceType.LOGICAL: + self.logical_devices[device_id] = device + logger.info(f"逻辑设备 {device_id} 已分类") + + def _setup_workstation_communication_interfaces(self): + """设置工作站特定的通信接口代理""" + for logical_device_id, logical_device in self.logical_devices.items(): + # 检查是否有配置的通信接口 + interface_config = self.communication_interfaces.get(logical_device_id) + if not interface_config: + continue + + comm_device = self.communication_devices.get(interface_config.device_id) + if not comm_device: + logger.error(f"通信设备 {interface_config.device_id} 不存在") + continue + + # 设置工作站级别的通信代理 + self._setup_workstation_hardware_proxy( + logical_device, + comm_device, + interface_config + ) + + def _setup_workstation_hardware_proxy(self, logical_device, comm_device, interface: CommunicationInterface): + """为逻辑设备设置工作站级通信代理""" + try: + # 获取通信设备的读写方法 + read_func = getattr(comm_device.driver_instance, interface.read_method, None) + write_func = getattr(comm_device.driver_instance, interface.write_method, None) + + if read_func: + setattr(logical_device.driver_instance, 'comm_read', read_func) + if write_func: + setattr(logical_device.driver_instance, 'comm_write', write_func) + + # 设置通信配置 + setattr(logical_device.driver_instance, 'comm_config', interface.config) + setattr(logical_device.driver_instance, 'comm_protocol', interface.protocol_type) + + logger.info(f"为逻辑设备 {logical_device.device_id} 设置工作站通信代理 -> {comm_device.device_id}") + + except Exception as e: + logger.error(f"设置工作站通信代理失败: {e}") + + @abstractmethod + def _create_communication_module(self) -> WorkstationCommunicationBase: + """创建通信模块 - 子类必须实现""" + pass + + @abstractmethod + def _create_material_management_module(self) -> MaterialManagementBase: + """创建物料管理模块 - 子类必须实现""" + pass + + @abstractmethod + def _register_supported_workflows(self): + """注册支持的工作流 - 子类必须实现""" + pass + + def _create_workstation_services(self): + """创建工作站ROS服务""" + self._workstation_services = { + # 动态工作流管理服务 + "register_workflow": self.create_service( + SerialCommand, + f"/srv{self.namespace}/register_workflow", + self._handle_register_workflow, + callback_group=self.callback_group, + ), + "unregister_workflow": self.create_service( + SerialCommand, + f"/srv{self.namespace}/unregister_workflow", + self._handle_unregister_workflow, + callback_group=self.callback_group, + ), + "list_workflows": self.create_service( + SerialCommand, + f"/srv{self.namespace}/list_workflows", + self._handle_list_workflows, + callback_group=self.callback_group, + ), + + # 增强物料管理服务 + "create_resource": self.create_service( + SerialCommand, + f"/srv{self.namespace}/create_resource", + self._handle_create_resource, + callback_group=self.callback_group, + ), + "delete_resource": self.create_service( + SerialCommand, + f"/srv{self.namespace}/delete_resource", + self._handle_delete_resource, + callback_group=self.callback_group, + ), + "update_resource": self.create_service( + SerialCommand, + f"/srv{self.namespace}/update_resource", + self._handle_update_resource, + callback_group=self.callback_group, + ), + "get_resource": self.create_service( + SerialCommand, + f"/srv{self.namespace}/get_resource", + self._handle_get_resource, + callback_group=self.callback_group, + ), + + # 工作站特有服务 + "start_workflow": self.create_service( + SerialCommand, + f"/srv{self.namespace}/start_workflow", + self._handle_start_workflow, + callback_group=self.callback_group, + ), + "stop_workflow": self.create_service( + SerialCommand, + f"/srv{self.namespace}/stop_workflow", + self._handle_stop_workflow, + callback_group=self.callback_group, + ), + "get_workflow_status": self.create_service( + SerialCommand, + f"/srv{self.namespace}/get_workflow_status", + self._handle_get_workflow_status, + callback_group=self.callback_group, + ), + "get_device_status": self.create_service( + SerialCommand, + f"/srv{self.namespace}/get_device_status", + self._handle_get_device_status, + callback_group=self.callback_group, + ), + "get_production_data": self.create_service( + SerialCommand, + f"/srv{self.namespace}/get_production_data", + self._handle_get_production_data, + callback_group=self.callback_group, + ), + "get_material_inventory": self.create_service( + SerialCommand, + f"/srv{self.namespace}/get_material_inventory", + self._handle_get_material_inventory, + callback_group=self.callback_group, + ), + "find_materials": self.create_service( + SerialCommand, + f"/srv{self.namespace}/find_materials", + self._handle_find_materials, + callback_group=self.callback_group, + ), + "write_register": self.create_service( + SerialCommand, + f"/srv{self.namespace}/write_register", + self._handle_write_register, + callback_group=self.callback_group, + ), + "read_register": self.create_service( + SerialCommand, + f"/srv{self.namespace}/read_register", + self._handle_read_register, + callback_group=self.callback_group, + ), + "emergency_stop": self.create_service( + SerialCommand, + f"/srv{self.namespace}/emergency_stop", + self._handle_emergency_stop, + callback_group=self.callback_group, + ), + } + + def _start_status_monitoring(self): + """启动状态监控""" + # 这里可以启动定期状态查询线程 + # 目前简化为按需查询 + pass + + # ============ 工作流控制接口 ============ + + def _handle_start_workflow(self, request, response): + """处理启动工作流请求""" + try: + import json + + # 解析请求参数 + params = json.loads(request.data) if request.data else {} + workflow_type = params.get("workflow_type", "") + workflow_parameters = params.get("parameters", {}) + + if not workflow_type: + response.success = False + response.message = "缺少工作流类型参数" + return response + + if workflow_type not in self.supported_workflows: + response.success = False + response.message = f"不支持的工作流类型: {workflow_type}" + return response + + if self.current_workflow_status != WorkflowStatus.IDLE: + response.success = False + response.message = f"当前状态不允许启动工作流: {self.current_workflow_status.value}" + return response + + # 启动工作流 + success = self.start_workflow(workflow_type, workflow_parameters) + + response.success = success + response.message = "工作流启动成功" if success else "工作流启动失败" + response.data = json.dumps({ + "workflow_type": workflow_type, + "status": self.current_workflow_status.value, + "estimated_duration": self.supported_workflows[workflow_type].estimated_duration + }) + + except Exception as e: + logger.error(f"处理启动工作流请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + def _handle_stop_workflow(self, request, response): + """处理停止工作流请求""" + try: + import json + + params = json.loads(request.data) if request.data else {} + emergency = params.get("emergency", False) + + success = self.stop_workflow(emergency) + + response.success = success + response.message = "工作流停止成功" if success else "工作流停止失败" + response.data = json.dumps({ + "status": self.current_workflow_status.value, + "emergency": emergency + }) + + except Exception as e: + logger.error(f"处理停止工作流请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + def _handle_get_workflow_status(self, request, response): + """处理获取工作流状态请求""" + try: + import json + import time + + status_info = { + "status": self.current_workflow_status.value, + "workflow_info": self.current_workflow_info.name if self.current_workflow_info else None, + "start_time": self.workflow_start_time, + "parameters": self.workflow_parameters, + "supported_workflows": { + name: { + "description": info.description, + "estimated_duration": info.estimated_duration, + "required_materials": info.required_materials, + "output_product": info.output_product + } + for name, info in self.supported_workflows.items() + } + } + + # 如果工作流正在运行,添加进度信息 + if self.current_workflow_status == WorkflowStatus.RUNNING and self.workflow_start_time: + elapsed_time = time.time() - self.workflow_start_time + estimated_duration = self.current_workflow_info.estimated_duration if self.current_workflow_info else 0 + progress = min(elapsed_time / estimated_duration * 100, 99) if estimated_duration > 0 else 0 + + status_info.update({ + "elapsed_time": elapsed_time, + "estimated_remaining": max(estimated_duration - elapsed_time, 0), + "progress_percent": progress + }) + + # 查询PLC状态 + plc_status = self.communication.get_workflow_status() + status_info["plc_status"] = plc_status + + response.success = True + response.message = "获取状态成功" + response.data = json.dumps(status_info) + + except Exception as e: + logger.error(f"处理获取工作流状态请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + # ============ 设备状态接口 ============ + + def _handle_get_device_status(self, request, response): + """处理获取设备状态请求""" + try: + import json + + # 从通信模块获取设备状态 + device_status = self.communication.get_device_status() + + response.success = True + response.message = "获取设备状态成功" + response.data = json.dumps(device_status) + + except Exception as e: + logger.error(f"处理获取设备状态请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + def _handle_get_production_data(self, request, response): + """处理获取生产数据请求""" + try: + import json + + # 从通信模块获取生产数据 + production_data = self.communication.get_production_data() + + response.success = True + response.message = "获取生产数据成功" + response.data = json.dumps(production_data) + + except Exception as e: + logger.error(f"处理获取生产数据请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + # ============ 物料管理接口 ============ + + def _handle_get_material_inventory(self, request, response): + """处理获取物料库存请求""" + try: + import json + + # 从物料管理模块获取库存 + inventory = self.material_management.get_material_inventory() + deck_state = self.material_management.get_deck_state() + + result = { + "inventory": inventory, + "deck_state": deck_state + } + + response.success = True + response.message = "获取物料库存成功" + response.data = json.dumps(result) + + except Exception as e: + logger.error(f"处理获取物料库存请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + def _handle_find_materials(self, request, response): + """处理查找物料请求""" + try: + import json + + params = json.loads(request.data) if request.data else {} + material_type = params.get("material_type", "") + category = params.get("category", "") + name_pattern = params.get("name_pattern", "") + + found_materials = [] + + if material_type: + materials = self.material_management.find_materials_by_type(material_type) + found_materials.extend([self.material_management.convert_to_unilab_format(m) for m in materials]) + + if category: + materials = self.material_management.resource_tracker.find_by_category(category) + found_materials.extend([self.material_management.convert_to_unilab_format(m) for m in materials]) + + if name_pattern: + materials = self.material_management.resource_tracker.find_by_name_pattern(name_pattern) + found_materials.extend([self.material_management.convert_to_unilab_format(m) for m in materials]) + + response.success = True + response.message = f"找到 {len(found_materials)} 个物料" + response.data = json.dumps({"materials": found_materials}) + + except Exception as e: + logger.error(f"处理查找物料请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + # ============ 调试控制接口 ============ + + def _handle_write_register(self, request, response): + """处理写寄存器请求""" + try: + import json + from unilabos.device_comms.modbus_plc.node.modbus import DataType, WorderOrder + + params = json.loads(request.data) if request.data else {} + register_name = params.get("register_name", "") + value = params.get("value") + data_type_str = params.get("data_type", "") + word_order_str = params.get("word_order", "") + + if not register_name or value is None: + response.success = False + response.message = "缺少寄存器名称或值" + return response + + # 转换数据类型和字节序 + data_type = DataType[data_type_str] if data_type_str else None + word_order = WorderOrder[word_order_str] if word_order_str else None + + success = self.communication.write_register(register_name, value, data_type, word_order) + + response.success = success + response.message = "写寄存器成功" if success else "写寄存器失败" + response.data = json.dumps({ + "register_name": register_name, + "value": value, + "data_type": data_type_str, + "word_order": word_order_str + }) + + except Exception as e: + logger.error(f"处理写寄存器请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + def _handle_read_register(self, request, response): + """处理读寄存器请求""" + try: + import json + from unilabos.device_comms.modbus_plc.node.modbus import DataType, WorderOrder + + params = json.loads(request.data) if request.data else {} + register_name = params.get("register_name", "") + count = params.get("count", 1) + data_type_str = params.get("data_type", "") + word_order_str = params.get("word_order", "") + + if not register_name: + response.success = False + response.message = "缺少寄存器名称" + return response + + # 转换数据类型和字节序 + data_type = DataType[data_type_str] if data_type_str else None + word_order = WorderOrder[word_order_str] if word_order_str else None + + value, error = self.communication.read_register(register_name, count, data_type, word_order) + + response.success = not error + response.message = "读寄存器成功" if not error else "读寄存器失败" + response.data = json.dumps({ + "register_name": register_name, + "value": value, + "error": error, + "data_type": data_type_str, + "word_order": word_order_str + }) + + except Exception as e: + logger.error(f"处理读寄存器请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + def _handle_emergency_stop(self, request, response): + """处理紧急停止请求""" + try: + import json + + # 立即停止工作流 + success = self.stop_workflow(emergency=True) + + # 更新状态 + if success: + self.current_workflow_status = WorkflowStatus.STOPPED + + response.success = success + response.message = "紧急停止成功" if success else "紧急停止失败" + response.data = json.dumps({ + "status": self.current_workflow_status.value, + "timestamp": time.time() + }) + + except Exception as e: + logger.error(f"处理紧急停止请求失败: {e}") + response.success = False + response.message = f"处理请求失败: {str(e)}" + + return response + + # ============ 工作流控制方法 ============ + + def start_workflow(self, workflow_type: str, parameters: Dict[str, Any] = None) -> bool: + """启动工作流""" + try: + if workflow_type not in self.supported_workflows: + logger.error(f"不支持的工作流类型: {workflow_type}") + return False + + if self.current_workflow_status != WorkflowStatus.IDLE: + logger.error(f"当前状态不允许启动工作流: {self.current_workflow_status}") + return False + + # 更新状态 + self.current_workflow_status = WorkflowStatus.INITIALIZING + self.current_workflow_info = self.supported_workflows[workflow_type] + self.workflow_parameters = parameters or {} + + # 通过通信模块启动工作流 + success = self.communication.start_workflow(workflow_type, self.workflow_parameters) + + if success: + self.current_workflow_status = WorkflowStatus.RUNNING + self.workflow_start_time = time.time() + logger.info(f"工作流启动成功: {workflow_type}") + else: + self.current_workflow_status = WorkflowStatus.ERROR + logger.error(f"工作流启动失败: {workflow_type}") + + return success + + except Exception as e: + logger.error(f"启动工作流失败: {e}") + self.current_workflow_status = WorkflowStatus.ERROR + return False + + def stop_workflow(self, emergency: bool = False) -> bool: + """停止工作流""" + try: + if self.current_workflow_status in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]: + logger.warning("没有正在运行的工作流") + return True + + # 更新状态 + self.current_workflow_status = WorkflowStatus.STOPPING + + # 通过通信模块停止工作流 + success = self.communication.stop_workflow(emergency) + + if success: + self.current_workflow_status = WorkflowStatus.STOPPED + logger.info(f"工作流停止成功 (紧急: {emergency})") + else: + self.current_workflow_status = WorkflowStatus.ERROR + logger.error(f"工作流停止失败 (紧急: {emergency})") + + return success + + except Exception as e: + logger.error(f"停止工作流失败: {e}") + self.current_workflow_status = WorkflowStatus.ERROR + return False + + # ============ 状态属性 ============ + + @property + def is_busy(self) -> bool: + """是否忙碌""" + return self.current_workflow_status in [ + WorkflowStatus.INITIALIZING, + WorkflowStatus.RUNNING, + WorkflowStatus.STOPPING + ] + + @property + def is_ready(self) -> bool: + """是否就绪""" + return self.current_workflow_status == WorkflowStatus.IDLE + + @property + def has_error(self) -> bool: + """是否有错误""" + return self.current_workflow_status == WorkflowStatus.ERROR + + @property + def communication_status(self) -> Dict[str, Any]: + """通信状态""" + return { + "is_connected": self.communication.is_connected, + "host": self.communication.config.host, + "port": self.communication.config.port, + "protocol": self.communication.config.protocol.value + } + + @property + def material_status(self) -> Dict[str, Any]: + """物料状态""" + return { + "total_resources": len(self.material_management.plr_resources), + "inventory": self.material_management.get_material_inventory(), + "deck_size": { + "x": self.material_management.plr_deck.size_x, + "y": self.material_management.plr_deck.size_y, + "z": self.material_management.plr_deck.size_z + } + } + + # ============ 增强物料管理接口 ============ + + def _handle_create_resource(self, request, response): + """处理创建物料请求""" + try: + data = json.loads(request.data) if request.data else {} + result = self.create_resource( + resource_data=data.get("resource_data"), + parent_id=data.get("parent_id"), + location=data.get("location"), + metadata=data.get("metadata", {}) + ) + response.success = True + response.message = "创建物料成功" + response.data = serialize_result_info("", True, result) + except Exception as e: + error_msg = f"创建物料失败: {e}\n{traceback.format_exc()}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + def create_resource(self, resource_data: Dict[str, Any], parent_id: Optional[str] = None, + location: Optional[Dict[str, float]] = None, metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """创建物料资源""" + try: + # 验证资源数据 + if not self._validate_resource_data(resource_data): + raise ValueError("无效的资源数据") + + # 添加到本地资源跟踪器 + resource = convert_to_ros_msg(Resource, resource_data) + self.resource_tracker.add_resource(resource) + + # 如果有父节点,建立关联 + if parent_id: + self._link_resource_to_parent(resource_data["id"], parent_id, location) + + # 同步到全局资源管理器 + self._sync_resource_to_global(resource, "create") + + logger.info(f"物料 {resource_data['id']} 创建成功") + return {"resource_id": resource_data["id"], "status": "created"} + + except Exception as e: + logger.error(f"创建物料失败: {e}") + raise + + def _handle_delete_resource(self, request, response): + """处理删除物料请求""" + try: + data = json.loads(request.data) if request.data else {} + result = self.delete_resource(data.get("resource_id")) + response.success = True + response.message = "删除物料成功" + response.data = serialize_result_info("", True, result) + except Exception as e: + error_msg = f"删除物料失败: {e}\n{traceback.format_exc()}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + def delete_resource(self, resource_id: str) -> Dict[str, Any]: + """删除物料资源""" + try: + # 从本地资源跟踪器删除 + resources = self.resource_tracker.figure_resource({"id": resource_id}) + if not resources: + raise ValueError(f"资源 {resource_id} 不存在") + + # 同步到全局资源管理器 + self._sync_resource_to_global(resources[0], "delete") + + logger.info(f"物料 {resource_id} 删除成功") + return {"resource_id": resource_id, "status": "deleted"} + + except Exception as e: + logger.error(f"删除物料失败: {e}") + raise + + def _handle_update_resource(self, request, response): + """处理更新物料请求""" + try: + data = json.loads(request.data) if request.data else {} + result = self.update_resource( + resource_id=data.get("resource_id"), + updates=data.get("updates", {}) + ) + response.success = True + response.message = "更新物料成功" + response.data = serialize_result_info("", True, result) + except Exception as e: + error_msg = f"更新物料失败: {e}\n{traceback.format_exc()}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + def update_resource(self, resource_id: str, updates: Dict[str, Any]) -> Dict[str, Any]: + """更新物料资源""" + try: + # 查找资源 + resources = self.resource_tracker.figure_resource({"id": resource_id}) + if not resources: + raise ValueError(f"资源 {resource_id} 不存在") + + resource = resources[0] + + # 更新资源数据 + if isinstance(resource, Resource): + if "data" in updates: + current_data = json.loads(resource.data) if resource.data else {} + current_data.update(updates["data"]) + resource.data = json.dumps(current_data) + + for key, value in updates.items(): + if key != "data" and hasattr(resource, key): + setattr(resource, key, value) + + # 同步到全局资源管理器 + self._sync_resource_to_global(resource, "update") + + logger.info(f"物料 {resource_id} 更新成功") + return {"resource_id": resource_id, "status": "updated"} + + except Exception as e: + logger.error(f"更新物料失败: {e}") + raise + + def _handle_get_resource(self, request, response): + """处理获取物料请求""" + try: + data = json.loads(request.data) if request.data else {} + result = self.get_resource( + resource_id=data.get("resource_id"), + with_children=data.get("with_children", False) + ) + response.success = True + response.message = "获取物料成功" + response.data = serialize_result_info("", True, result) + except Exception as e: + error_msg = f"获取物料失败: {e}\n{traceback.format_exc()}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + def get_resource(self, resource_id: str, with_children: bool = False) -> Dict[str, Any]: + """获取物料资源""" + try: + resources = self.resource_tracker.figure_resource({"id": resource_id}) + if not resources: + raise ValueError(f"资源 {resource_id} 不存在") + + resource = resources[0] + + # 转换为字典格式 + if isinstance(resource, Resource): + result = convert_from_ros_msg(resource) + else: + result = resource + + # 如果需要包含子资源 + if with_children: + children = self._get_child_resources(resource_id) + result["children"] = children + + return result + + except Exception as e: + logger.error(f"获取物料失败: {e}") + raise + + # ============ 动态工作流管理接口 ============ + + def _handle_register_workflow(self, request, response): + """处理注册工作流请求""" + try: + data = json.loads(request.data) if request.data else {} + result = self.register_workflow( + workflow_name=data.get("workflow_name"), + workflow_definition=data.get("workflow_definition"), + action_type=data.get("action_type") + ) + response.success = True + response.message = "注册工作流成功" + response.data = serialize_result_info("", True, result) + except Exception as e: + error_msg = f"注册工作流失败: {e}\n{traceback.format_exc()}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + def register_workflow(self, workflow_name: str, workflow_definition: Dict[str, Any], + action_type: Optional[str] = None) -> Dict[str, Any]: + """注册工作流并创建对应的ROS Action""" + try: + # 验证工作流定义 + if not self._validate_workflow_definition(workflow_definition): + raise ValueError("无效的工作流定义") + + # 创建工作流定义对象 + workflow = WorkflowDefinition( + name=workflow_name, + description=workflow_definition.get("description", ""), + steps=[WorkflowStep(**step) for step in workflow_definition.get("steps", [])], + input_schema=workflow_definition.get("input_schema", {}), + output_schema=workflow_definition.get("output_schema", {}), + metadata=workflow_definition.get("metadata", {}) + ) + + # 存储工作流定义 + self.registered_workflows[workflow_name] = workflow + + # 创建对应的ROS Action Server + self._create_workflow_action_server(workflow_name, workflow, action_type) + + logger.info(f"工作流 {workflow_name} 注册成功") + return {"workflow_name": workflow_name, "status": "registered"} + + except Exception as e: + logger.error(f"注册工作流失败: {e}") + raise + + def _create_workflow_action_server(self, workflow_name: str, workflow: WorkflowDefinition, action_type: Optional[str]): + """为工作流创建ROS Action Server""" + try: + # 如果没有指定action_type,使用默认的SendCmd + if not action_type: + from unilabos_msgs.action import SendCmd + action_type_class = SendCmd + else: + # 动态导入指定的action类型 + action_type_class = self._import_action_type(action_type) + + # 创建Action Server + self._workflow_action_servers[workflow_name] = ActionServer( + self, + action_type_class, + workflow_name, + execute_callback=self._create_workflow_execute_callback(workflow), + callback_group=ReentrantCallbackGroup(), + ) + + logger.info(f"为工作流 {workflow_name} 创建Action Server") + + except Exception as e: + logger.error(f"创建工作流Action Server失败: {e}") + raise + + def _create_workflow_execute_callback(self, workflow: WorkflowDefinition): + """创建工作流执行回调""" + async def execute_workflow(goal_handle: ServerGoalHandle): + execution_error = "" + execution_success = False + workflow_return_value = None + + try: + logger.info(f"开始执行工作流: {workflow.name}") + + # 解析输入参数 + goal = goal_handle.request + workflow_kwargs = self._parse_workflow_input(goal, workflow.input_schema) + + # 执行工作流步骤 + step_results = [] + for step in workflow.steps: + # 检查依赖 + if step.depends_on: + self._wait_for_dependencies(step.depends_on, step_results) + + # 执行步骤 + step_result = await self._execute_workflow_step(step, workflow_kwargs) + step_results.append(step_result) + + # 发布反馈 + feedback = self._create_workflow_feedback(workflow, step_results) + goal_handle.publish_feedback(feedback) + + execution_success = True + workflow_return_value = { + "workflow_name": workflow.name, + "steps_completed": len(step_results), + "results": step_results + } + + goal_handle.succeed() + + except Exception as e: + execution_error = traceback.format_exc() + execution_success = False + logger.error(f"工作流执行失败: {e}") + goal_handle.abort() + + # 创建结果 + result = goal_handle._action_type.Result() + result.success = execution_success + + # 如果有return_info字段,设置详细信息 + if hasattr(result, 'return_info'): + result.return_info = serialize_result_info(execution_error, execution_success, workflow_return_value) + + return result + + return execute_workflow + + async def _execute_workflow_step(self, step: WorkflowStep, workflow_kwargs: Dict[str, Any]) -> Dict[str, Any]: + """执行单个工作流步骤 - 使用父类的execute_single_action方法""" + try: + # 替换参数中的变量 + resolved_kwargs = self._resolve_step_kwargs(step.action_kwargs, workflow_kwargs) + + # 使用父类的execute_single_action方法执行动作 + result = await self.execute_single_action( + device_id=step.device_id, + action_name=step.action_name, + action_kwargs=resolved_kwargs + ) + + return { + "step_id": step.step_id or f"{step.device_id}_{step.action_name}", + "device_id": step.device_id, + "action_name": step.action_name, + "status": "success", + "result": result + } + + except Exception as e: + logger.error(f"步骤执行失败: {step.step_id}, 错误: {e}") + return { + "step_id": step.step_id or f"{step.device_id}_{step.action_name}", + "device_id": step.device_id, + "action_name": step.action_name, + "status": "failed", + "error": str(e) + } + + def _handle_unregister_workflow(self, request, response): + """处理注销工作流请求""" + try: + data = json.loads(request.data) if request.data else {} + workflow_name = data.get("workflow_name") + + if workflow_name in self.registered_workflows: + del self.registered_workflows[workflow_name] + + if workflow_name in self._workflow_action_servers: + # 销毁Action Server + del self._workflow_action_servers[workflow_name] + + result = {"workflow_name": workflow_name, "status": "unregistered"} + response.success = True + response.message = "注销工作流成功" + response.data = serialize_result_info("", True, result) + else: + raise ValueError(f"工作流 {workflow_name} 不存在") + + except Exception as e: + error_msg = f"注销工作流失败: {e}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + def _handle_list_workflows(self, request, response): + """处理列出工作流请求""" + try: + # 静态预定义工作流 + static_workflows = [] + for name, workflow in self.supported_workflows.items(): + static_workflows.append({ + "name": name, + "type": "static", + "description": workflow.description, + "estimated_duration": workflow.estimated_duration, + "required_materials": workflow.required_materials, + "output_product": workflow.output_product + }) + + # 动态注册工作流 + dynamic_workflows = [] + for name, workflow in self.registered_workflows.items(): + dynamic_workflows.append({ + "name": name, + "type": "dynamic", + "description": workflow.description, + "step_count": len(workflow.steps), + "metadata": workflow.metadata + }) + + result = { + "static_workflows": static_workflows, + "dynamic_workflows": dynamic_workflows, + "total_count": len(static_workflows) + len(dynamic_workflows) + } + response.success = True + response.message = "列出工作流成功" + response.data = serialize_result_info("", True, result) + except Exception as e: + error_msg = f"列出工作流失败: {e}" + logger.error(error_msg) + response.success = False + response.message = error_msg + response.data = serialize_result_info(error_msg, False, {}) + return response + + # ============ 辅助方法 ============ + + def _validate_resource_data(self, resource_data: Dict[str, Any]) -> bool: + """验证资源数据""" + required_fields = ["id", "name", "type"] + return all(field in resource_data for field in required_fields) + + def _validate_workflow_definition(self, workflow_def: Dict[str, Any]) -> bool: + """验证工作流定义""" + required_fields = ["steps"] + return all(field in workflow_def for field in required_fields) + + def _sync_resource_to_global(self, resource: Resource, operation: str): + """同步资源到全局管理器""" + # 实现与全局资源管理器的同步逻辑 + pass + + def _link_resource_to_parent(self, resource_id: str, parent_id: str, location: Optional[Dict[str, float]]): + """将资源链接到父节点""" + # 实现资源父子关系的建立逻辑 + pass + + def _get_child_resources(self, resource_id: str) -> List[Dict[str, Any]]: + """获取子资源""" + # 实现获取子资源的逻辑 + return [] + + def _import_action_type(self, action_type: str): + """动态导入Action类型""" + # 实现动态导入逻辑 + from unilabos_msgs.action import SendCmd + return SendCmd + + def _parse_workflow_input(self, goal, input_schema: Dict[str, Any]) -> Dict[str, Any]: + """解析工作流输入""" + # 根据input_schema解析goal中的参数 + return {} + + def _wait_for_dependencies(self, dependencies: List[str], completed_steps: List[Dict[str, Any]]): + """等待依赖步骤完成""" + # 实现依赖等待逻辑 + pass + + def _resolve_step_kwargs(self, action_kwargs: Dict[str, Any], workflow_kwargs: Dict[str, Any]) -> Dict[str, Any]: + """解析步骤参数中的变量""" + # 实现参数变量替换逻辑 + return action_kwargs + + def _create_workflow_feedback(self, workflow: WorkflowDefinition, step_results: List[Dict[str, Any]]): + """创建工作流反馈""" + # 创建反馈消息 + return None + + # ============ 增强状态属性 ============ + + @property + def communication_device_count(self) -> int: + """通信设备数量""" + return len(self.communication_devices) + + @property + def logical_device_count(self) -> int: + """逻辑设备数量""" + return len(self.logical_devices) + + @property + def active_dynamic_workflows(self) -> int: + """活跃动态工作流数量""" + return len([server for server in self._workflow_action_servers.values() if server]) + + @property + def total_workflow_count(self) -> int: + """总工作流数量""" + return len(self.supported_workflows) + len(self.registered_workflows) + + @property + def workstation_resource_count(self) -> int: + """工作站资源数量""" + return len(self.resource_tracker.figure_resource({})) + + @property + def workstation_status_summary(self) -> Dict[str, Any]: + """工作站状态摘要""" + return { + "workflow_status": self.current_workflow_status.value, + "is_busy": self.is_busy, + "is_ready": self.is_ready, + "has_error": self.has_error, + "total_devices": len(self.sub_devices), + "communication_devices": self.communication_device_count, + "logical_devices": self.logical_device_count, + "total_workflows": self.total_workflow_count, + "active_workflows": self.active_dynamic_workflows, + "total_resources": self.workstation_resource_count, + "communication_status": self.communication_status, + "material_status": self.material_status + } diff --git a/unilabos/device_comms/workstation_communication.py b/unilabos/device_comms/workstation_communication.py new file mode 100644 index 00000000..3067b802 --- /dev/null +++ b/unilabos/device_comms/workstation_communication.py @@ -0,0 +1,600 @@ +""" +工作站通信基类 +Workstation Communication Base Class + +从具体设备驱动中抽取通用通信模式 +""" +import json +import time +import threading +from typing import Dict, Any, Optional, Callable, Union, List +from abc import ABC, abstractmethod +from dataclasses import dataclass +from enum import Enum + +from unilabos.device_comms.modbus_plc.client import TCPClient as ModbusTCPClient +from unilabos.device_comms.modbus_plc.node.modbus import DataType, WorderOrder +from unilabos.utils.log import logger + + +class CommunicationProtocol(Enum): + """通信协议类型""" + MODBUS_TCP = "modbus_tcp" + MODBUS_RTU = "modbus_rtu" + SERIAL = "serial" + ETHERNET = "ethernet" + + +@dataclass +class CommunicationConfig: + """通信配置""" + protocol: CommunicationProtocol + host: str + port: int + timeout: float = 5.0 + retry_count: int = 3 + extra_params: Dict[str, Any] = None + + +class WorkstationCommunicationBase(ABC): + """工作站通信基类 + + 定义工作站通信的标准接口: + 1. 状态查询 - 定期获取设备状态 + 2. 命令下发 - 发送控制指令 + 3. 数据采集 - 收集生产数据 + 4. 紧急控制 - 单点调试控制 + """ + + def __init__(self, communication_config: CommunicationConfig): + self.config = communication_config + self.client = None + self.is_connected = False + self.last_status = {} + self.data_export_thread = None + self.data_export_running = False + + # 状态缓存 + self._status_cache = {} + self._last_update_time = 0 + self._cache_timeout = 1.0 # 缓存1秒 + + self._initialize_communication() + + @abstractmethod + def _initialize_communication(self): + """初始化通信连接""" + pass + + @abstractmethod + def _load_address_mapping(self) -> Dict[str, Any]: + """加载地址映射表""" + pass + + def connect(self) -> bool: + """建立连接""" + try: + if self.config.protocol == CommunicationProtocol.MODBUS_TCP: + self.client = ModbusTCPClient( + addr=self.config.host, + port=self.config.port + ) + self.client.client.connect() + + # 等待连接建立 + count = 100 + while count > 0: + count -= 1 + if self.client.client.is_socket_open(): + self.is_connected = True + logger.info(f"工作站通信连接成功: {self.config.host}:{self.config.port}") + return True + time.sleep(0.1) + + if not self.client.client.is_socket_open(): + raise ConnectionError(f"无法连接到工作站: {self.config.host}:{self.config.port}") + + else: + raise NotImplementedError(f"协议 {self.config.protocol} 暂未实现") + + except Exception as e: + logger.error(f"工作站通信连接失败: {e}") + self.is_connected = False + return False + + def disconnect(self): + """断开连接""" + try: + if self.client and hasattr(self.client, 'client'): + self.client.client.close() + self.is_connected = False + logger.info("工作站通信连接已断开") + except Exception as e: + logger.error(f"断开连接时出错: {e}") + + # ============ 标准工作流接口 ============ + + def start_workflow(self, workflow_type: str, parameters: Dict[str, Any] = None) -> bool: + """启动工作流""" + try: + if not self.is_connected: + logger.error("通信未连接,无法启动工作流") + return False + + logger.info(f"启动工作流: {workflow_type}, 参数: {parameters}") + return self._execute_start_workflow(workflow_type, parameters or {}) + + except Exception as e: + logger.error(f"启动工作流失败: {e}") + return False + + def stop_workflow(self, emergency: bool = False) -> bool: + """停止工作流""" + try: + if not self.is_connected: + logger.error("通信未连接,无法停止工作流") + return False + + logger.info(f"停止工作流 (紧急: {emergency})") + return self._execute_stop_workflow(emergency) + + except Exception as e: + logger.error(f"停止工作流失败: {e}") + return False + + def get_workflow_status(self) -> Dict[str, Any]: + """获取工作流状态""" + try: + if not self.is_connected: + return {"error": "通信未连接"} + + return self._query_workflow_status() + + except Exception as e: + logger.error(f"查询工作流状态失败: {e}") + return {"error": str(e)} + + # ============ 设备状态查询接口 ============ + + def get_device_status(self, force_refresh: bool = False) -> Dict[str, Any]: + """获取设备状态(带缓存)""" + current_time = time.time() + + if not force_refresh and (current_time - self._last_update_time) < self._cache_timeout: + return self._status_cache + + try: + if not self.is_connected: + return {"error": "通信未连接"} + + status = self._query_device_status() + self._status_cache = status + self._last_update_time = current_time + return status + + except Exception as e: + logger.error(f"查询设备状态失败: {e}") + return {"error": str(e)} + + def get_production_data(self) -> Dict[str, Any]: + """获取生产数据""" + try: + if not self.is_connected: + return {"error": "通信未连接"} + + return self._query_production_data() + + except Exception as e: + logger.error(f"查询生产数据失败: {e}") + return {"error": str(e)} + + # ============ 单点控制接口(调试用)============ + + def write_register(self, register_name: str, value: Any, data_type: DataType = None, word_order: WorderOrder = None) -> bool: + """写寄存器(单点控制)""" + try: + if not self.is_connected: + logger.error("通信未连接,无法写寄存器") + return False + + return self._write_single_register(register_name, value, data_type, word_order) + + except Exception as e: + logger.error(f"写寄存器失败: {e}") + return False + + def read_register(self, register_name: str, count: int = 1, data_type: DataType = None, word_order: WorderOrder = None) -> tuple: + """读寄存器(单点控制)""" + try: + if not self.is_connected: + logger.error("通信未连接,无法读寄存器") + return None, True + + return self._read_single_register(register_name, count, data_type, word_order) + + except Exception as e: + logger.error(f"读寄存器失败: {e}") + return None, True + + # ============ 数据导出功能 ============ + + def start_data_export(self, file_path: str, export_interval: float = 1.0) -> bool: + """开始数据导出""" + try: + if self.data_export_running: + logger.warning("数据导出已在运行") + return False + + self.data_export_file = file_path + self.data_export_interval = export_interval + self.data_export_running = True + + # 创建CSV文件并写入表头 + self._initialize_export_file(file_path) + + # 启动数据收集线程 + self.data_export_thread = threading.Thread(target=self._data_export_worker) + self.data_export_thread.daemon = True + self.data_export_thread.start() + + logger.info(f"数据导出已启动: {file_path}") + return True + + except Exception as e: + logger.error(f"启动数据导出失败: {e}") + return False + + def stop_data_export(self) -> bool: + """停止数据导出""" + try: + if not self.data_export_running: + logger.warning("数据导出未运行") + return False + + self.data_export_running = False + + if self.data_export_thread and self.data_export_thread.is_alive(): + self.data_export_thread.join(timeout=5.0) + + logger.info("数据导出已停止") + return True + + except Exception as e: + logger.error(f"停止数据导出失败: {e}") + return False + + def _data_export_worker(self): + """数据导出工作线程""" + while self.data_export_running: + try: + data = self.get_production_data() + self._append_to_export_file(data) + time.sleep(self.data_export_interval) + except Exception as e: + logger.error(f"数据导出工作线程错误: {e}") + + # ============ 抽象方法 - 子类必须实现 ============ + + @abstractmethod + def _execute_start_workflow(self, workflow_type: str, parameters: Dict[str, Any]) -> bool: + """执行启动工作流命令""" + pass + + @abstractmethod + def _execute_stop_workflow(self, emergency: bool) -> bool: + """执行停止工作流命令""" + pass + + @abstractmethod + def _query_workflow_status(self) -> Dict[str, Any]: + """查询工作流状态""" + pass + + @abstractmethod + def _query_device_status(self) -> Dict[str, Any]: + """查询设备状态""" + pass + + @abstractmethod + def _query_production_data(self) -> Dict[str, Any]: + """查询生产数据""" + pass + + @abstractmethod + def _write_single_register(self, register_name: str, value: Any, data_type: DataType, word_order: WorderOrder) -> bool: + """写单个寄存器""" + pass + + @abstractmethod + def _read_single_register(self, register_name: str, count: int, data_type: DataType, word_order: WorderOrder) -> tuple: + """读单个寄存器""" + pass + + @abstractmethod + def _initialize_export_file(self, file_path: str): + """初始化导出文件""" + pass + + @abstractmethod + def _append_to_export_file(self, data: Dict[str, Any]): + """追加数据到导出文件""" + pass + + +class CoinCellCommunication(WorkstationCommunicationBase): + """纽扣电池组装系统通信类 + + 从 coin_cell_assembly_system 抽取的通信功能 + """ + + def __init__(self, communication_config: CommunicationConfig, csv_path: str = "./coin_cell_assembly.csv"): + self.csv_path = csv_path + super().__init__(communication_config) + + def _initialize_communication(self): + """初始化通信连接""" + # 加载节点映射 + try: + nodes = self.client.load_csv(self.csv_path) if self.client else [] + if self.client: + self.client.register_node_list(nodes) + except Exception as e: + logger.error(f"加载节点映射失败: {e}") + + def _load_address_mapping(self) -> Dict[str, Any]: + """加载地址映射表""" + # 从CSV文件加载地址映射 + return {} + + def _execute_start_workflow(self, workflow_type: str, parameters: Dict[str, Any]) -> bool: + """执行启动工作流命令""" + if workflow_type == "battery_manufacturing": + # 发送电池制造启动命令 + return self._start_battery_manufacturing(parameters) + else: + logger.error(f"不支持的工作流类型: {workflow_type}") + return False + + def _start_battery_manufacturing(self, parameters: Dict[str, Any]) -> bool: + """启动电池制造工作流""" + try: + # 1. 设置参数 + if "electrolyte_num" in parameters: + self.client.use_node('REG_MSG_ELECTROLYTE_NUM').write(parameters["electrolyte_num"]) + + if "electrolyte_volume" in parameters: + self.client.use_node('REG_MSG_ELECTROLYTE_VOLUME').write( + parameters["electrolyte_volume"], + data_type=DataType.FLOAT32, + word_order=WorderOrder.LITTLE + ) + + if "assembly_pressure" in parameters: + self.client.use_node('REG_MSG_ASSEMBLY_PRESSURE').write( + parameters["assembly_pressure"], + data_type=DataType.FLOAT32, + word_order=WorderOrder.LITTLE + ) + + # 2. 发送启动命令 + self.client.use_node('COIL_SYS_START_CMD').write(True) + + # 3. 确认启动成功 + time.sleep(0.5) + status, read_err = self.client.use_node('COIL_SYS_START_STATUS').read(1) + return not read_err and status[0] + + except Exception as e: + logger.error(f"启动电池制造工作流失败: {e}") + return False + + def _execute_stop_workflow(self, emergency: bool) -> bool: + """执行停止工作流命令""" + try: + if emergency: + # 紧急停止 + self.client.use_node('COIL_SYS_RESET_CMD').write(True) + else: + # 正常停止 + self.client.use_node('COIL_SYS_STOP_CMD').write(True) + + time.sleep(0.5) + status, read_err = self.client.use_node('COIL_SYS_STOP_STATUS').read(1) + return not read_err and status[0] + + except Exception as e: + logger.error(f"停止工作流失败: {e}") + return False + + def _query_workflow_status(self) -> Dict[str, Any]: + """查询工作流状态""" + try: + status = {} + + # 读取系统状态 + start_status, _ = self.client.use_node('COIL_SYS_START_STATUS').read(1) + stop_status, _ = self.client.use_node('COIL_SYS_STOP_STATUS').read(1) + auto_status, _ = self.client.use_node('COIL_SYS_AUTO_STATUS').read(1) + init_status, _ = self.client.use_node('COIL_SYS_INIT_STATUS').read(1) + + status.update({ + "is_running": start_status[0] if start_status else False, + "is_stopped": stop_status[0] if stop_status else False, + "is_auto_mode": auto_status[0] if auto_status else False, + "is_initialized": init_status[0] if init_status else False, + }) + + return status + + except Exception as e: + logger.error(f"查询工作流状态失败: {e}") + return {"error": str(e)} + + def _query_device_status(self) -> Dict[str, Any]: + """查询设备状态""" + try: + status = {} + + # 读取位置信息 + x_pos, _ = self.client.use_node('REG_DATA_AXIS_X_POS').read(2, word_order=WorderOrder.LITTLE) + y_pos, _ = self.client.use_node('REG_DATA_AXIS_Y_POS').read(2, word_order=WorderOrder.LITTLE) + z_pos, _ = self.client.use_node('REG_DATA_AXIS_Z_POS').read(2, word_order=WorderOrder.LITTLE) + + # 读取环境数据 + pressure, _ = self.client.use_node('REG_DATA_GLOVE_BOX_PRESSURE').read(2, word_order=WorderOrder.LITTLE) + o2_content, _ = self.client.use_node('REG_DATA_GLOVE_BOX_O2_CONTENT').read(2, word_order=WorderOrder.LITTLE) + water_content, _ = self.client.use_node('REG_DATA_GLOVE_BOX_WATER_CONTENT').read(2, word_order=WorderOrder.LITTLE) + + status.update({ + "axis_position": { + "x": x_pos[0] if x_pos else 0.0, + "y": y_pos[0] if y_pos else 0.0, + "z": z_pos[0] if z_pos else 0.0, + }, + "environment": { + "glove_box_pressure": pressure[0] if pressure else 0.0, + "o2_content": o2_content[0] if o2_content else 0.0, + "water_content": water_content[0] if water_content else 0.0, + } + }) + + return status + + except Exception as e: + logger.error(f"查询设备状态失败: {e}") + return {"error": str(e)} + + def _query_production_data(self) -> Dict[str, Any]: + """查询生产数据""" + try: + data = {} + + # 读取生产统计 + coin_cell_num, _ = self.client.use_node('REG_DATA_ASSEMBLY_COIN_CELL_NUM').read(1) + assembly_time, _ = self.client.use_node('REG_DATA_ASSEMBLY_TIME').read(2, word_order=WorderOrder.LITTLE) + voltage, _ = self.client.use_node('REG_DATA_OPEN_CIRCUIT_VOLTAGE').read(2, word_order=WorderOrder.LITTLE) + + # 读取当前产品信息 + coin_cell_code, _ = self.client.use_node('REG_DATA_COIN_CELL_CODE').read(20) # 假设是字符串 + electrolyte_code, _ = self.client.use_node('REG_DATA_ELECTROLYTE_CODE').read(20) + + data.update({ + "production_count": coin_cell_num[0] if coin_cell_num else 0, + "assembly_time": assembly_time[0] if assembly_time else 0.0, + "open_circuit_voltage": voltage[0] if voltage else 0.0, + "current_battery_code": self._decode_string(coin_cell_code) if coin_cell_code else "", + "current_electrolyte_code": self._decode_string(electrolyte_code) if electrolyte_code else "", + "timestamp": time.time(), + }) + + return data + + except Exception as e: + logger.error(f"查询生产数据失败: {e}") + return {"error": str(e)} + + def _write_single_register(self, register_name: str, value: Any, data_type: DataType = None, word_order: WorderOrder = None) -> bool: + """写单个寄存器""" + try: + kwargs = {"value": value} + if data_type: + kwargs["data_type"] = data_type + if word_order: + kwargs["word_order"] = word_order + + result = self.client.use_node(register_name).write(**kwargs) + return result + + except Exception as e: + logger.error(f"写寄存器 {register_name} 失败: {e}") + return False + + def _read_single_register(self, register_name: str, count: int = 1, data_type: DataType = None, word_order: WorderOrder = None) -> tuple: + """读单个寄存器""" + try: + kwargs = {"count": count} + if data_type: + kwargs["data_type"] = data_type + if word_order: + kwargs["word_order"] = word_order + + value, error = self.client.use_node(register_name).read(**kwargs) + return value, error + + except Exception as e: + logger.error(f"读寄存器 {register_name} 失败: {e}") + return None, True + + def _initialize_export_file(self, file_path: str): + """初始化导出文件""" + import csv + try: + with open(file_path, 'w', newline='', encoding='utf-8') as csvfile: + fieldnames = [ + 'timestamp', 'production_count', 'assembly_time', + 'open_circuit_voltage', 'battery_code', 'electrolyte_code', + 'axis_x', 'axis_y', 'axis_z', 'glove_box_pressure', + 'o2_content', 'water_content' + ] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + except Exception as e: + logger.error(f"初始化导出文件失败: {e}") + + def _append_to_export_file(self, data: Dict[str, Any]): + """追加数据到导出文件""" + import csv + try: + with open(self.data_export_file, 'a', newline='', encoding='utf-8') as csvfile: + fieldnames = [ + 'timestamp', 'production_count', 'assembly_time', + 'open_circuit_voltage', 'battery_code', 'electrolyte_code', + 'axis_x', 'axis_y', 'axis_z', 'glove_box_pressure', + 'o2_content', 'water_content' + ] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + + row = { + 'timestamp': data.get('timestamp', time.time()), + 'production_count': data.get('production_count', 0), + 'assembly_time': data.get('assembly_time', 0.0), + 'open_circuit_voltage': data.get('open_circuit_voltage', 0.0), + 'battery_code': data.get('current_battery_code', ''), + 'electrolyte_code': data.get('current_electrolyte_code', ''), + } + + # 添加位置数据 + axis_pos = data.get('axis_position', {}) + row.update({ + 'axis_x': axis_pos.get('x', 0.0), + 'axis_y': axis_pos.get('y', 0.0), + 'axis_z': axis_pos.get('z', 0.0), + }) + + # 添加环境数据 + env = data.get('environment', {}) + row.update({ + 'glove_box_pressure': env.get('glove_box_pressure', 0.0), + 'o2_content': env.get('o2_content', 0.0), + 'water_content': env.get('water_content', 0.0), + }) + + writer.writerow(row) + + except Exception as e: + logger.error(f"追加数据到导出文件失败: {e}") + + def _decode_string(self, data_list: List[int]) -> str: + """将寄存器数据解码为字符串""" + try: + # 假设每个寄存器包含2个字符(16位) + chars = [] + for value in data_list: + if value == 0: + break + chars.append(chr(value & 0xFF)) + if (value >> 8) & 0xFF != 0: + chars.append(chr((value >> 8) & 0xFF)) + return ''.join(chars).rstrip('\x00') + except: + return "" diff --git a/unilabos/device_comms/workstation_material_management.py b/unilabos/device_comms/workstation_material_management.py new file mode 100644 index 00000000..a9229130 --- /dev/null +++ b/unilabos/device_comms/workstation_material_management.py @@ -0,0 +1,583 @@ +""" +工作站物料管理基类 +Workstation Material Management Base Class + +基于PyLabRobot的物料管理系统 +""" +from typing import Dict, Any, List, Optional, Union, Type +from abc import ABC, abstractmethod +import json + +from pylabrobot.resources import ( + Resource as PLRResource, + Container, + Deck, + Coordinate as PLRCoordinate, +) + +from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker +from unilabos.utils.log import logger +from unilabos.resources.graphio import resource_plr_to_ulab, resource_ulab_to_plr + + +class MaterialManagementBase(ABC): + """物料管理基类 + + 定义工作站物料管理的标准接口: + 1. 物料初始化 - 根据配置创建物料资源 + 2. 物料追踪 - 实时跟踪物料位置和状态 + 3. 物料查找 - 按类型、位置、状态查找物料 + 4. 物料转换 - PyLabRobot与UniLab资源格式转换 + """ + + def __init__( + self, + device_id: str, + deck_config: Dict[str, Any], + resource_tracker: DeviceNodeResourceTracker, + children_config: Dict[str, Dict[str, Any]] = None + ): + self.device_id = device_id + self.deck_config = deck_config + self.resource_tracker = resource_tracker + self.children_config = children_config or {} + + # 创建主台面 + self.plr_deck = self._create_deck() + + # 扩展ResourceTracker + self._extend_resource_tracker() + + # 注册deck到resource tracker + self.resource_tracker.add_resource(self.plr_deck) + + # 初始化子资源 + self.plr_resources = {} + self._initialize_materials() + + def _create_deck(self) -> Deck: + """创建主台面""" + return Deck( + name=f"{self.device_id}_deck", + size_x=self.deck_config.get("size_x", 1000.0), + size_y=self.deck_config.get("size_y", 1000.0), + size_z=self.deck_config.get("size_z", 500.0), + origin=PLRCoordinate(0, 0, 0) + ) + + def _extend_resource_tracker(self): + """扩展ResourceTracker以支持PyLabRobot特定功能""" + + def find_by_type(resource_type): + """按类型查找资源""" + return self._find_resources_by_type_recursive(self.plr_deck, resource_type) + + def find_by_category(category: str): + """按类别查找资源""" + found = [] + for resource in self._get_all_resources(): + if hasattr(resource, 'category') and resource.category == category: + found.append(resource) + return found + + def find_by_name_pattern(pattern: str): + """按名称模式查找资源""" + import re + found = [] + for resource in self._get_all_resources(): + if re.search(pattern, resource.name): + found.append(resource) + return found + + # 动态添加方法到resource_tracker + self.resource_tracker.find_by_type = find_by_type + self.resource_tracker.find_by_category = find_by_category + self.resource_tracker.find_by_name_pattern = find_by_name_pattern + + def _find_resources_by_type_recursive(self, resource, target_type): + """递归查找指定类型的资源""" + found = [] + if isinstance(resource, target_type): + found.append(resource) + + # 递归查找子资源 + children = getattr(resource, "children", []) + for child in children: + found.extend(self._find_resources_by_type_recursive(child, target_type)) + + return found + + def _get_all_resources(self) -> List[PLRResource]: + """获取所有资源""" + all_resources = [] + + def collect_resources(resource): + all_resources.append(resource) + children = getattr(resource, "children", []) + for child in children: + collect_resources(child) + + collect_resources(self.plr_deck) + return all_resources + + def _initialize_materials(self): + """初始化物料""" + try: + # 确定创建顺序,确保父资源先于子资源创建 + creation_order = self._determine_creation_order() + + # 按顺序创建资源 + for resource_id in creation_order: + config = self.children_config[resource_id] + self._create_plr_resource(resource_id, config) + + logger.info(f"物料管理系统初始化完成,共创建 {len(self.plr_resources)} 个资源") + + except Exception as e: + logger.error(f"物料初始化失败: {e}") + + def _determine_creation_order(self) -> List[str]: + """确定资源创建顺序""" + order = [] + visited = set() + + def visit(resource_id: str): + if resource_id in visited: + return + visited.add(resource_id) + + config = self.children_config.get(resource_id, {}) + parent_id = config.get("parent") + + # 如果有父资源,先访问父资源 + if parent_id and parent_id in self.children_config: + visit(parent_id) + + order.append(resource_id) + + for resource_id in self.children_config: + visit(resource_id) + + return order + + def _create_plr_resource(self, resource_id: str, config: Dict[str, Any]): + """创建PyLabRobot资源""" + try: + resource_type = config.get("type", "unknown") + data = config.get("data", {}) + location_config = config.get("location", {}) + + # 创建位置坐标 + location = PLRCoordinate( + x=location_config.get("x", 0.0), + y=location_config.get("y", 0.0), + z=location_config.get("z", 0.0) + ) + + # 根据类型创建资源 + resource = self._create_resource_by_type(resource_id, resource_type, config, data, location) + + if resource: + # 设置父子关系 + parent_id = config.get("parent") + if parent_id and parent_id in self.plr_resources: + parent_resource = self.plr_resources[parent_id] + parent_resource.assign_child_resource(resource, location) + else: + # 直接放在deck上 + self.plr_deck.assign_child_resource(resource, location) + + # 保存资源引用 + self.plr_resources[resource_id] = resource + + # 注册到resource tracker + self.resource_tracker.add_resource(resource) + + logger.debug(f"创建资源成功: {resource_id} ({resource_type})") + + except Exception as e: + logger.error(f"创建资源失败 {resource_id}: {e}") + + @abstractmethod + def _create_resource_by_type( + self, + resource_id: str, + resource_type: str, + config: Dict[str, Any], + data: Dict[str, Any], + location: PLRCoordinate + ) -> Optional[PLRResource]: + """根据类型创建资源 - 子类必须实现""" + pass + + # ============ 物料查找接口 ============ + + def find_materials_by_type(self, material_type: str) -> List[PLRResource]: + """按材料类型查找物料""" + return self.resource_tracker.find_by_category(material_type) + + def find_material_by_id(self, resource_id: str) -> Optional[PLRResource]: + """按ID查找物料""" + return self.plr_resources.get(resource_id) + + def find_available_positions(self, position_type: str) -> List[PLRResource]: + """查找可用位置""" + positions = self.resource_tracker.find_by_category(position_type) + available = [] + + for pos in positions: + if hasattr(pos, 'is_available') and pos.is_available(): + available.append(pos) + elif hasattr(pos, 'children') and len(pos.children) == 0: + available.append(pos) + + return available + + def get_material_inventory(self) -> Dict[str, int]: + """获取物料库存统计""" + inventory = {} + + for resource in self._get_all_resources(): + if hasattr(resource, 'category'): + category = resource.category + inventory[category] = inventory.get(category, 0) + 1 + + return inventory + + # ============ 物料状态更新接口 ============ + + def update_material_location(self, material_id: str, new_location: PLRCoordinate) -> bool: + """更新物料位置""" + try: + material = self.find_material_by_id(material_id) + if material: + material.location = new_location + return True + return False + except Exception as e: + logger.error(f"更新物料位置失败: {e}") + return False + + def move_material(self, material_id: str, target_container_id: str) -> bool: + """移动物料到目标容器""" + try: + material = self.find_material_by_id(material_id) + target = self.find_material_by_id(target_container_id) + + if material and target: + # 从原位置移除 + if material.parent: + material.parent.unassign_child_resource(material) + + # 添加到新位置 + target.assign_child_resource(material) + return True + + return False + + except Exception as e: + logger.error(f"移动物料失败: {e}") + return False + + # ============ 资源转换接口 ============ + + def convert_to_unilab_format(self, plr_resource: PLRResource) -> Dict[str, Any]: + """将PyLabRobot资源转换为UniLab格式""" + return resource_plr_to_ulab(plr_resource) + + def convert_from_unilab_format(self, unilab_resource: Dict[str, Any]) -> PLRResource: + """将UniLab格式转换为PyLabRobot资源""" + return resource_ulab_to_plr(unilab_resource) + + def get_deck_state(self) -> Dict[str, Any]: + """获取Deck状态""" + try: + return { + "deck_info": { + "name": self.plr_deck.name, + "size": { + "x": self.plr_deck.size_x, + "y": self.plr_deck.size_y, + "z": self.plr_deck.size_z + }, + "children_count": len(self.plr_deck.children) + }, + "resources": { + resource_id: self.convert_to_unilab_format(resource) + for resource_id, resource in self.plr_resources.items() + }, + "inventory": self.get_material_inventory() + } + except Exception as e: + logger.error(f"获取Deck状态失败: {e}") + return {"error": str(e)} + + # ============ 数据持久化接口 ============ + + def save_state_to_file(self, file_path: str) -> bool: + """保存状态到文件""" + try: + state = self.get_deck_state() + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(state, f, indent=2, ensure_ascii=False) + logger.info(f"状态已保存到: {file_path}") + return True + except Exception as e: + logger.error(f"保存状态失败: {e}") + return False + + def load_state_from_file(self, file_path: str) -> bool: + """从文件加载状态""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + state = json.load(f) + + # 重新创建资源 + self._recreate_resources_from_state(state) + logger.info(f"状态已从文件加载: {file_path}") + return True + + except Exception as e: + logger.error(f"加载状态失败: {e}") + return False + + def _recreate_resources_from_state(self, state: Dict[str, Any]): + """从状态重新创建资源""" + # 清除现有资源 + self.plr_resources.clear() + self.plr_deck.children.clear() + + # 从状态重新创建 + resources_data = state.get("resources", {}) + for resource_id, resource_data in resources_data.items(): + try: + plr_resource = self.convert_from_unilab_format(resource_data) + self.plr_resources[resource_id] = plr_resource + self.plr_deck.assign_child_resource(plr_resource) + except Exception as e: + logger.error(f"重新创建资源失败 {resource_id}: {e}") + + +class CoinCellMaterialManagement(MaterialManagementBase): + """纽扣电池物料管理类 + + 从 button_battery_station 抽取的物料管理功能 + """ + + def _create_resource_by_type( + self, + resource_id: str, + resource_type: str, + config: Dict[str, Any], + data: Dict[str, Any], + location: PLRCoordinate + ) -> Optional[PLRResource]: + """根据类型创建纽扣电池相关资源""" + + # 导入纽扣电池资源类 + from unilabos.device_comms.button_battery_station import ( + MaterialPlate, PlateSlot, ClipMagazine, BatteryPressSlot, + TipBox64, WasteTipBox, BottleRack, Battery, ElectrodeSheet + ) + + try: + if resource_type == "material_plate": + return self._create_material_plate(resource_id, config, data, location) + + elif resource_type == "plate_slot": + return self._create_plate_slot(resource_id, config, data, location) + + elif resource_type == "clip_magazine": + return self._create_clip_magazine(resource_id, config, data, location) + + elif resource_type == "battery_press_slot": + return self._create_battery_press_slot(resource_id, config, data, location) + + elif resource_type == "tip_box": + return self._create_tip_box(resource_id, config, data, location) + + elif resource_type == "waste_tip_box": + return self._create_waste_tip_box(resource_id, config, data, location) + + elif resource_type == "bottle_rack": + return self._create_bottle_rack(resource_id, config, data, location) + + elif resource_type == "battery": + return self._create_battery(resource_id, config, data, location) + + else: + logger.warning(f"未知的资源类型: {resource_type}") + return None + + except Exception as e: + logger.error(f"创建资源失败 {resource_id} ({resource_type}): {e}") + return None + + def _create_material_plate(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建料板""" + from unilabos.device_comms.button_battery_station import MaterialPlate, ElectrodeSheet + + plate = MaterialPlate( + name=resource_id, + size_x=config.get("size_x", 80.0), + size_y=config.get("size_y", 80.0), + size_z=config.get("size_z", 10.0), + hole_diameter=config.get("hole_diameter", 15.0), + hole_depth=config.get("hole_depth", 8.0), + hole_spacing_x=config.get("hole_spacing_x", 20.0), + hole_spacing_y=config.get("hole_spacing_y", 20.0), + number=data.get("number", "") + ) + plate.location = location + + # 如果有预填充的极片数据,创建极片 + electrode_sheets = data.get("electrode_sheets", []) + for i, sheet_data in enumerate(electrode_sheets): + if i < len(plate.children): # 确保不超过洞位数量 + hole = plate.children[i] + sheet = ElectrodeSheet( + name=f"{resource_id}_sheet_{i}", + diameter=sheet_data.get("diameter", 14.0), + thickness=sheet_data.get("thickness", 0.1), + mass=sheet_data.get("mass", 0.01), + material_type=sheet_data.get("material_type", "cathode"), + info=sheet_data.get("info", "") + ) + hole.place_electrode_sheet(sheet) + + return plate + + def _create_plate_slot(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建板槽位""" + from unilabos.device_comms.button_battery_station import PlateSlot + + slot = PlateSlot( + name=resource_id, + max_plates=config.get("max_plates", 8) + ) + slot.location = location + return slot + + def _create_clip_magazine(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建子弹夹""" + from unilabos.device_comms.button_battery_station import ClipMagazine + + magazine = ClipMagazine( + name=resource_id, + size_x=config.get("size_x", 150.0), + size_y=config.get("size_y", 100.0), + size_z=config.get("size_z", 50.0), + hole_diameter=config.get("hole_diameter", 15.0), + hole_depth=config.get("hole_depth", 40.0), + hole_spacing=config.get("hole_spacing", 25.0), + max_sheets_per_hole=config.get("max_sheets_per_hole", 100) + ) + magazine.location = location + return magazine + + def _create_battery_press_slot(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建电池压制槽""" + from unilabos.device_comms.button_battery_station import BatteryPressSlot + + slot = BatteryPressSlot( + name=resource_id, + diameter=config.get("diameter", 20.0), + depth=config.get("depth", 15.0) + ) + slot.location = location + return slot + + def _create_tip_box(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建枪头盒""" + from unilabos.device_comms.button_battery_station import TipBox64 + + tip_box = TipBox64( + name=resource_id, + size_x=config.get("size_x", 127.8), + size_y=config.get("size_y", 85.5), + size_z=config.get("size_z", 60.0), + with_tips=data.get("with_tips", True) + ) + tip_box.location = location + return tip_box + + def _create_waste_tip_box(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建废枪头盒""" + from unilabos.device_comms.button_battery_station import WasteTipBox + + waste_box = WasteTipBox( + name=resource_id, + size_x=config.get("size_x", 127.8), + size_y=config.get("size_y", 85.5), + size_z=config.get("size_z", 60.0), + max_tips=config.get("max_tips", 100) + ) + waste_box.location = location + return waste_box + + def _create_bottle_rack(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建瓶架""" + from unilabos.device_comms.button_battery_station import BottleRack + + rack = BottleRack( + name=resource_id, + size_x=config.get("size_x", 210.0), + size_y=config.get("size_y", 140.0), + size_z=config.get("size_z", 100.0), + bottle_diameter=config.get("bottle_diameter", 30.0), + bottle_height=config.get("bottle_height", 100.0), + position_spacing=config.get("position_spacing", 35.0) + ) + rack.location = location + return rack + + def _create_battery(self, resource_id: str, config: Dict[str, Any], data: Dict[str, Any], location: PLRCoordinate): + """创建电池""" + from unilabos.device_comms.button_battery_station import Battery + + battery = Battery( + name=resource_id, + diameter=config.get("diameter", 20.0), + height=config.get("height", 3.2), + max_volume=config.get("max_volume", 100.0), + barcode=data.get("barcode", "") + ) + battery.location = location + return battery + + # ============ 纽扣电池特定查找方法 ============ + + def find_material_plates(self): + """查找所有料板""" + from unilabos.device_comms.button_battery_station import MaterialPlate + return self.resource_tracker.find_by_type(MaterialPlate) + + def find_batteries(self): + """查找所有电池""" + from unilabos.device_comms.button_battery_station import Battery + return self.resource_tracker.find_by_type(Battery) + + def find_electrode_sheets(self): + """查找所有极片""" + found = [] + plates = self.find_material_plates() + for plate in plates: + for hole in plate.children: + if hasattr(hole, 'has_electrode_sheet') and hole.has_electrode_sheet(): + found.append(hole._electrode_sheet) + return found + + def find_plate_slots(self): + """查找所有板槽位""" + from unilabos.device_comms.button_battery_station import PlateSlot + return self.resource_tracker.find_by_type(PlateSlot) + + def find_clip_magazines(self): + """查找所有子弹夹""" + from unilabos.device_comms.button_battery_station import ClipMagazine + return self.resource_tracker.find_by_type(ClipMagazine) + + def find_press_slots(self): + """查找所有压制槽""" + from unilabos.device_comms.button_battery_station import BatteryPressSlot + return self.resource_tracker.find_by_type(BatteryPressSlot)