From 5ec8a57a1f6038e28f75e63cfc7e7ba499bd5594 Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Mon, 25 Aug 2025 22:09:37 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20ProtocolNode=E2=86=92WorkstationNod?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../02-topology-and-chemputer-compile.md | 2 +- .../workstation_architecture.md | 12 +- unilabos/device_comms/modbus_plc/client.py | 4 +- .../modbus_plc/{node => }/modbus.py | 0 .../device_comms/modbus_plc/node/__init__.py | 0 .../device_comms/modbus_plc/test/client.py | 2 +- .../device_comms/modbus_plc/test/node_test.py | 2 +- .../modbus_plc/test/test_workflow.py | 2 +- .../devices/workstation/workflow_executors.py | 649 ++++++++++++++++++ .../workstation_base.py | 331 ++++----- .../workstation_http_service.py | 0 .../workstation_material_management.py | 0 unilabos/registry/devices/work_station.yaml | 2 +- unilabos/ros/msgs/message_converter.py | 2 + unilabos/ros/nodes/base_device_node.py | 73 +- unilabos/ros/nodes/presets/host_node.py | 2 +- unilabos/ros/nodes/presets/protocol_node.py | 347 ++++++---- unilabos/ros/nodes/presets/workstation.py | 4 +- unilabos/ros/utils/driver_creator.py | 20 +- 19 files changed, 1089 insertions(+), 365 deletions(-) rename unilabos/device_comms/modbus_plc/{node => }/modbus.py (100%) delete mode 100644 unilabos/device_comms/modbus_plc/node/__init__.py create mode 100644 unilabos/devices/workstation/workflow_executors.py rename unilabos/devices/{work_station => workstation}/workstation_base.py (56%) rename unilabos/devices/{work_station => workstation}/workstation_http_service.py (100%) rename unilabos/devices/{work_station => workstation}/workstation_material_management.py (100%) diff --git a/docs/concepts/02-topology-and-chemputer-compile.md b/docs/concepts/02-topology-and-chemputer-compile.md index 9e40bad9..3fc18e23 100644 --- a/docs/concepts/02-topology-and-chemputer-compile.md +++ b/docs/concepts/02-topology-and-chemputer-compile.md @@ -19,7 +19,7 @@ Uni-Lab 的组态图当前支持 node-link json 和 graphml 格式,其中包 对用户来说,“直接操作设备执行单个指令”不是个真实需求,真正的需求是**“执行对实验有意义的单个完整动作”——加入某种液体多少量;萃取分液;洗涤仪器等等。就像实验步骤文字书写的那样。** 而这些对实验有意义的单个完整动作,**一般需要多个设备的协同**,还依赖于他们的**物理连接关系(管道相连;机械臂可转运)**。 -于是 Uni-Lab 实现了抽象的“工作站”,即注册表中的 `workstation` 设备(`ProtocolNode`类)来处理编译、规划操作。以泵骨架组成的自动有机实验室为例,设备管道连接关系如下: +于是 Uni-Lab 实现了抽象的“工作站”,即注册表中的 `workstation` 设备(`WorkstationNode`类)来处理编译、规划操作。以泵骨架组成的自动有机实验室为例,设备管道连接关系如下: ![topology](image/02-topology-and-chemputer-compile/topology.png) diff --git a/docs/developer_guide/workstation_architecture.md b/docs/developer_guide/workstation_architecture.md index 3bdb52e2..f9d113e2 100644 --- a/docs/developer_guide/workstation_architecture.md +++ b/docs/developer_guide/workstation_architecture.md @@ -6,7 +6,7 @@ graph TB subgraph "工作站基础架构" WB[WorkstationBase] - WB --> |继承| RPN[ROS2ProtocolNode] + WB --> |继承| RPN[ROS2WorkstationNode] WB --> |组合| WCB[WorkstationCommunicationBase] WB --> |组合| MMB[MaterialManagementBase] WB --> |组合| WHS[WorkstationHTTPService] @@ -73,7 +73,7 @@ classDiagram +get_device_status() } - class ROS2ProtocolNode { + class ROS2WorkstationNode { +sub_devices: Dict +protocol_names: List +execute_single_action() @@ -131,7 +131,7 @@ classDiagram +_register_supported_workflows() } - WorkstationBase --|> ROS2ProtocolNode + WorkstationBase --|> ROS2WorkstationNode WorkstationBase *-- WorkstationCommunicationBase WorkstationBase *-- MaterialManagementBase WorkstationBase *-- WorkstationHTTPService @@ -155,10 +155,10 @@ sequenceDiagram participant COMM as CommunicationModule participant MAT as MaterialManager participant HTTP as HTTPService - participant ROS as ROS2ProtocolNode + participant ROS as ROS2WorkstationNode APP->>WS: 创建工作站实例 - WS->>ROS: 初始化ROS2ProtocolNode + WS->>ROS: 初始化ROS2WorkstationNode ROS->>ROS: 初始化子设备 ROS->>ROS: 设置硬件接口代理 @@ -191,7 +191,7 @@ sequenceDiagram participant WS as WorkstationBase participant COMM as CommunicationModule participant MAT as MaterialManager - participant ROS as ROS2ProtocolNode + participant ROS as ROS2WorkstationNode participant DEV as SubDevice EXT->>WS: start_workflow(type, params) diff --git a/unilabos/device_comms/modbus_plc/client.py b/unilabos/device_comms/modbus_plc/client.py index a7da3aff..c239b9d5 100644 --- a/unilabos/device_comms/modbus_plc/client.py +++ b/unilabos/device_comms/modbus_plc/client.py @@ -8,8 +8,8 @@ from pymodbus.client import ModbusSerialClient, ModbusTcpClient from pymodbus.framer import FramerType from typing import TypedDict -from unilabos.device_comms.modbus_plc.node.modbus import DeviceType, HoldRegister, Coil, InputRegister, DiscreteInputs, DataType, WorderOrder -from unilabos.device_comms.modbus_plc.node.modbus import Base as ModbusNodeBase +from unilabos.device_comms.modbus_plc.modbus import DeviceType, HoldRegister, Coil, InputRegister, DiscreteInputs, DataType, WorderOrder +from unilabos.device_comms.modbus_plc.modbus import Base as ModbusNodeBase from unilabos.device_comms.universal_driver import UniversalDriver from unilabos.utils.log import logger import pandas as pd diff --git a/unilabos/device_comms/modbus_plc/node/modbus.py b/unilabos/device_comms/modbus_plc/modbus.py similarity index 100% rename from unilabos/device_comms/modbus_plc/node/modbus.py rename to unilabos/device_comms/modbus_plc/modbus.py diff --git a/unilabos/device_comms/modbus_plc/node/__init__.py b/unilabos/device_comms/modbus_plc/node/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/unilabos/device_comms/modbus_plc/test/client.py b/unilabos/device_comms/modbus_plc/test/client.py index 070e180b..416f8493 100644 --- a/unilabos/device_comms/modbus_plc/test/client.py +++ b/unilabos/device_comms/modbus_plc/test/client.py @@ -1,6 +1,6 @@ import time from pymodbus.client import ModbusTcpClient -from unilabos.device_comms.modbus_plc.node.modbus import Coil, HoldRegister +from unilabos.device_comms.modbus_plc.modbus import Coil, HoldRegister from pymodbus.payload import BinaryPayloadDecoder from pymodbus.constants import Endian diff --git a/unilabos/device_comms/modbus_plc/test/node_test.py b/unilabos/device_comms/modbus_plc/test/node_test.py index d2fa2d75..d36f28d7 100644 --- a/unilabos/device_comms/modbus_plc/test/node_test.py +++ b/unilabos/device_comms/modbus_plc/test/node_test.py @@ -1,6 +1,6 @@ # coding=utf-8 from pymodbus.client import ModbusTcpClient -from unilabos.device_comms.modbus_plc.node.modbus import Coil +from unilabos.device_comms.modbus_plc.modbus import Coil import time diff --git a/unilabos/device_comms/modbus_plc/test/test_workflow.py b/unilabos/device_comms/modbus_plc/test/test_workflow.py index e418a3c5..8f764d6a 100644 --- a/unilabos/device_comms/modbus_plc/test/test_workflow.py +++ b/unilabos/device_comms/modbus_plc/test/test_workflow.py @@ -1,7 +1,7 @@ import time from typing import Callable from unilabos.device_comms.modbus_plc.client import TCPClient, ModbusWorkflow, WorkflowAction, load_csv -from unilabos.device_comms.modbus_plc.node.modbus import Base as ModbusNodeBase +from unilabos.device_comms.modbus_plc.modbus import Base as ModbusNodeBase ############ 第一种写法 ############## diff --git a/unilabos/devices/workstation/workflow_executors.py b/unilabos/devices/workstation/workflow_executors.py new file mode 100644 index 00000000..93f00ae4 --- /dev/null +++ b/unilabos/devices/workstation/workflow_executors.py @@ -0,0 +1,649 @@ +""" +工作流执行器模块 +Workflow Executors Module + +基于单一硬件接口的工作流执行器实现 +支持Modbus、HTTP、PyLabRobot和代理模式 +""" +import time +import json +import asyncio +from typing import Dict, Any, List, Optional, TYPE_CHECKING +from abc import ABC, abstractmethod + +if TYPE_CHECKING: + from unilabos.devices.work_station.workstation_base import WorkstationBase + +from unilabos.utils.log import logger + + +class WorkflowExecutor(ABC): + """工作流执行器基类 - 基于单一硬件接口""" + + def __init__(self, workstation: 'WorkstationBase'): + self.workstation = workstation + self.hardware_interface = workstation.hardware_interface + self.material_management = workstation.material_management + + @abstractmethod + def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行工作流""" + pass + + @abstractmethod + def stop_workflow(self, emergency: bool = False) -> bool: + """停止工作流""" + pass + + def call_device(self, method: str, *args, **kwargs) -> Any: + """调用设备方法的统一接口""" + return self.workstation.call_device_method(method, *args, **kwargs) + + def get_device_status(self) -> Dict[str, Any]: + """获取设备状态""" + return self.workstation.get_device_status() + + +class ModbusWorkflowExecutor(WorkflowExecutor): + """Modbus工作流执行器 - 适配 coin_cell_assembly_system""" + + def __init__(self, workstation: 'WorkstationBase'): + super().__init__(workstation) + + # 验证Modbus接口 + if not (hasattr(self.hardware_interface, 'write_register') and + hasattr(self.hardware_interface, 'read_register')): + raise RuntimeError("工作站硬件接口不是有效的Modbus客户端") + + def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行Modbus工作流""" + if workflow_name == "battery_manufacturing": + return self._execute_battery_manufacturing(parameters) + elif workflow_name == "material_loading": + return self._execute_material_loading(parameters) + elif workflow_name == "quality_check": + return self._execute_quality_check(parameters) + else: + logger.warning(f"不支持的Modbus工作流: {workflow_name}") + return False + + def _execute_battery_manufacturing(self, parameters: Dict[str, Any]) -> bool: + """执行电池制造工作流""" + try: + # 1. 物料准备检查 + available_slot = self._find_available_press_slot() + if not available_slot: + raise RuntimeError("没有可用的压制槽") + + logger.info(f"找到可用压制槽: {available_slot}") + + # 2. 设置工艺参数(直接调用Modbus接口) + if "electrolyte_num" in parameters: + self.hardware_interface.write_register('REG_MSG_ELECTROLYTE_NUM', parameters["electrolyte_num"]) + logger.info(f"设置电解液编号: {parameters['electrolyte_num']}") + + if "electrolyte_volume" in parameters: + self.hardware_interface.write_register('REG_MSG_ELECTROLYTE_VOLUME', + parameters["electrolyte_volume"], + data_type="FLOAT32") + logger.info(f"设置电解液体积: {parameters['electrolyte_volume']}") + + if "assembly_pressure" in parameters: + self.hardware_interface.write_register('REG_MSG_ASSEMBLY_PRESSURE', + parameters["assembly_pressure"], + data_type="FLOAT32") + logger.info(f"设置装配压力: {parameters['assembly_pressure']}") + + # 3. 启动制造流程 + self.hardware_interface.write_register('COIL_SYS_START_CMD', True) + logger.info("启动电池制造流程") + + # 4. 确认启动成功 + time.sleep(0.5) + status = self.hardware_interface.read_register('COIL_SYS_START_STATUS', count=1) + success = status[0] if status else False + + if success: + logger.info(f"电池制造工作流启动成功,参数: {parameters}") + else: + logger.error("电池制造工作流启动失败") + + return success + + except Exception as e: + logger.error(f"执行电池制造工作流失败: {e}") + return False + + def _execute_material_loading(self, parameters: Dict[str, Any]) -> bool: + """执行物料装载工作流""" + try: + material_type = parameters.get('material_type', 'cathode') + position = parameters.get('position', 'A1') + + logger.info(f"开始物料装载: {material_type} -> {position}") + + # 设置物料类型和位置 + self.hardware_interface.write_register('REG_MATERIAL_TYPE', material_type) + self.hardware_interface.write_register('REG_MATERIAL_POSITION', position) + + # 启动装载 + self.hardware_interface.write_register('COIL_LOAD_START', True) + + # 等待装载完成 + timeout = parameters.get('timeout', 30) + start_time = time.time() + + while time.time() - start_time < timeout: + status = self.hardware_interface.read_register('COIL_LOAD_COMPLETE', count=1) + if status and status[0]: + logger.info(f"物料装载完成: {material_type} -> {position}") + return True + time.sleep(0.5) + + logger.error(f"物料装载超时: {material_type} -> {position}") + return False + + except Exception as e: + logger.error(f"执行物料装载失败: {e}") + return False + + def _execute_quality_check(self, parameters: Dict[str, Any]) -> bool: + """执行质量检测工作流""" + try: + check_type = parameters.get('check_type', 'dimensional') + + logger.info(f"开始质量检测: {check_type}") + + # 启动质量检测 + self.hardware_interface.write_register('REG_QC_TYPE', check_type) + self.hardware_interface.write_register('COIL_QC_START', True) + + # 等待检测完成 + timeout = parameters.get('timeout', 60) + start_time = time.time() + + while time.time() - start_time < timeout: + status = self.hardware_interface.read_register('COIL_QC_COMPLETE', count=1) + if status and status[0]: + # 读取检测结果 + result = self.hardware_interface.read_register('REG_QC_RESULT', count=1) + passed = result[0] if result else False + + if passed: + logger.info(f"质量检测通过: {check_type}") + return True + else: + logger.warning(f"质量检测失败: {check_type}") + return False + + time.sleep(1.0) + + logger.error(f"质量检测超时: {check_type}") + return False + + except Exception as e: + logger.error(f"执行质量检测失败: {e}") + return False + + def _find_available_press_slot(self) -> Optional[str]: + """查找可用压制槽""" + try: + press_slots = self.material_management.find_by_category("battery_press_slot") + for slot in press_slots: + if hasattr(slot, 'has_battery') and not slot.has_battery(): + return slot.name + return None + except: + # 如果物料管理系统不可用,返回默认槽位 + return "A1" + + def stop_workflow(self, emergency: bool = False) -> bool: + """停止工作流""" + try: + if emergency: + self.hardware_interface.write_register('COIL_SYS_RESET_CMD', True) + logger.warning("执行紧急停止") + else: + self.hardware_interface.write_register('COIL_SYS_STOP_CMD', True) + logger.info("执行正常停止") + + time.sleep(0.5) + status = self.hardware_interface.read_register('COIL_SYS_STOP_STATUS', count=1) + return status[0] if status else False + + except Exception as e: + logger.error(f"停止Modbus工作流失败: {e}") + return False + + +class HttpWorkflowExecutor(WorkflowExecutor): + """HTTP工作流执行器 - 适配 reaction_station_bioyong""" + + def __init__(self, workstation: 'WorkstationBase'): + super().__init__(workstation) + + # 验证HTTP接口 + if not (hasattr(self.hardware_interface, 'post') or + hasattr(self.hardware_interface, 'get')): + raise RuntimeError("工作站硬件接口不是有效的HTTP客户端") + + def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行HTTP工作流""" + try: + if workflow_name == "reaction_synthesis": + return self._execute_reaction_synthesis(parameters) + elif workflow_name == "liquid_feeding": + return self._execute_liquid_feeding(parameters) + elif workflow_name == "temperature_control": + return self._execute_temperature_control(parameters) + else: + logger.warning(f"不支持的HTTP工作流: {workflow_name}") + return False + + except Exception as e: + logger.error(f"执行HTTP工作流失败: {e}") + return False + + def _execute_reaction_synthesis(self, parameters: Dict[str, Any]) -> bool: + """执行反应合成工作流""" + try: + # 1. 设置工作流序列 + sequence = self._build_reaction_sequence(parameters) + self._call_rpc_method('set_workflow_sequence', json.dumps(sequence)) + + # 2. 设置反应参数 + if parameters.get('temperature'): + self._call_rpc_method('set_temperature', parameters['temperature']) + + if parameters.get('pressure'): + self._call_rpc_method('set_pressure', parameters['pressure']) + + if parameters.get('stirring_speed'): + self._call_rpc_method('set_stirring_speed', parameters['stirring_speed']) + + # 3. 执行工作流 + result = self._call_rpc_method('execute_current_sequence', { + "task_name": "reaction_synthesis" + }) + + success = result.get('success', False) + if success: + logger.info("反应合成工作流执行成功") + else: + logger.error(f"反应合成工作流执行失败: {result.get('error', '未知错误')}") + + return success + + except Exception as e: + logger.error(f"执行反应合成工作流失败: {e}") + return False + + def _execute_liquid_feeding(self, parameters: Dict[str, Any]) -> bool: + """执行液体投料工作流""" + try: + reagents = parameters.get('reagents', []) + volumes = parameters.get('volumes', []) + + if len(reagents) != len(volumes): + raise ValueError("试剂列表和体积列表长度不匹配") + + # 执行投料序列 + for reagent, volume in zip(reagents, volumes): + result = self._call_rpc_method('feed_liquid', { + 'reagent': reagent, + 'volume': volume + }) + + if not result.get('success', False): + logger.error(f"投料失败: {reagent} {volume}mL") + return False + + logger.info(f"投料成功: {reagent} {volume}mL") + + return True + + except Exception as e: + logger.error(f"执行液体投料失败: {e}") + return False + + def _execute_temperature_control(self, parameters: Dict[str, Any]) -> bool: + """执行温度控制工作流""" + try: + target_temp = parameters.get('temperature', 25) + hold_time = parameters.get('hold_time', 300) # 秒 + + # 设置目标温度 + result = self._call_rpc_method('set_temperature', target_temp) + if not result.get('success', False): + logger.error(f"设置温度失败: {target_temp}°C") + return False + + # 等待温度稳定 + logger.info(f"等待温度稳定到 {target_temp}°C") + + # 保持温度指定时间 + if hold_time > 0: + logger.info(f"保持温度 {hold_time} 秒") + time.sleep(hold_time) + + return True + + except Exception as e: + logger.error(f"执行温度控制失败: {e}") + return False + + def _build_reaction_sequence(self, parameters: Dict[str, Any]) -> List[str]: + """构建反应合成工作流序列""" + sequence = [] + + # 添加预处理步骤 + if parameters.get('purge_with_inert'): + sequence.append("purge_inert_gas") + + # 添加温度设置 + if parameters.get('temperature'): + sequence.append(f"set_temperature_{parameters['temperature']}") + + # 添加压力设置 + if parameters.get('pressure'): + sequence.append(f"set_pressure_{parameters['pressure']}") + + # 添加搅拌设置 + if parameters.get('stirring_speed'): + sequence.append(f"set_stirring_{parameters['stirring_speed']}") + + # 添加反应步骤 + sequence.extend([ + "start_reaction", + "monitor_progress", + "complete_reaction" + ]) + + # 添加后处理步骤 + if parameters.get('cooling_required'): + sequence.append("cool_down") + + return sequence + + def _call_rpc_method(self, method: str, params: Any = None) -> Dict[str, Any]: + """调用RPC方法""" + try: + if hasattr(self.hardware_interface, method): + # 直接方法调用 + if isinstance(params, dict): + params = json.dumps(params) + elif params is None: + params = "" + return getattr(self.hardware_interface, method)(params) + else: + # HTTP请求调用 + if hasattr(self.hardware_interface, 'post'): + response = self.hardware_interface.post(f"/api/{method}", json=params) + return response.json() + else: + raise AttributeError(f"HTTP接口不支持方法: {method}") + except Exception as e: + logger.error(f"调用RPC方法失败 {method}: {e}") + return {'success': False, 'error': str(e)} + + def stop_workflow(self, emergency: bool = False) -> bool: + """停止工作流""" + try: + if emergency: + result = self._call_rpc_method('scheduler_reset') + else: + result = self._call_rpc_method('scheduler_stop') + + return result.get('success', False) + + except Exception as e: + logger.error(f"停止HTTP工作流失败: {e}") + return False + + +class PyLabRobotWorkflowExecutor(WorkflowExecutor): + """PyLabRobot工作流执行器 - 适配 prcxi.py""" + + def __init__(self, workstation: 'WorkstationBase'): + super().__init__(workstation) + + # 验证PyLabRobot接口 + if not (hasattr(self.hardware_interface, 'transfer_liquid') or + hasattr(self.hardware_interface, 'pickup_tips')): + raise RuntimeError("工作站硬件接口不是有效的PyLabRobot设备") + + def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行PyLabRobot工作流""" + try: + if workflow_name == "liquid_transfer": + return self._execute_liquid_transfer(parameters) + elif workflow_name == "tip_pickup_drop": + return self._execute_tip_operations(parameters) + elif workflow_name == "plate_handling": + return self._execute_plate_handling(parameters) + else: + logger.warning(f"不支持的PyLabRobot工作流: {workflow_name}") + return False + + except Exception as e: + logger.error(f"执行PyLabRobot工作流失败: {e}") + return False + + def _execute_liquid_transfer(self, parameters: Dict[str, Any]) -> bool: + """执行液体转移工作流""" + try: + # 1. 解析物料引用 + sources = self._resolve_containers(parameters.get('sources', [])) + targets = self._resolve_containers(parameters.get('targets', [])) + tip_racks = self._resolve_tip_racks(parameters.get('tip_racks', [])) + + if not sources or not targets: + raise ValueError("液体转移需要指定源容器和目标容器") + + if not tip_racks: + logger.warning("未指定枪头架,将尝试自动查找") + tip_racks = self._find_available_tip_racks() + + # 2. 执行液体转移 + volumes = parameters.get('volumes', []) + if not volumes: + volumes = [100.0] * len(sources) # 默认体积 + + # 如果是同步接口 + if hasattr(self.hardware_interface, 'transfer_liquid'): + result = self.hardware_interface.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=tip_racks, + asp_vols=volumes, + dis_vols=volumes, + **parameters.get('options', {}) + ) + else: + # 异步接口需要特殊处理 + asyncio.run(self._async_liquid_transfer(sources, targets, tip_racks, volumes, parameters)) + result = True + + if result: + logger.info(f"液体转移工作流完成: {len(sources)}个源 -> {len(targets)}个目标") + + return bool(result) + + except Exception as e: + logger.error(f"执行液体转移失败: {e}") + return False + + async def _async_liquid_transfer(self, sources, targets, tip_racks, volumes, parameters): + """异步液体转移""" + await self.hardware_interface.transfer_liquid( + sources=sources, + targets=targets, + tip_racks=tip_racks, + asp_vols=volumes, + dis_vols=volumes, + **parameters.get('options', {}) + ) + + def _execute_tip_operations(self, parameters: Dict[str, Any]) -> bool: + """执行枪头操作工作流""" + try: + operation = parameters.get('operation', 'pickup') + tip_racks = self._resolve_tip_racks(parameters.get('tip_racks', [])) + + if not tip_racks: + raise ValueError("枪头操作需要指定枪头架") + + if operation == 'pickup': + result = self.hardware_interface.pickup_tips(tip_racks[0]) + logger.info("枪头拾取完成") + elif operation == 'drop': + result = self.hardware_interface.drop_tips() + logger.info("枪头丢弃完成") + else: + raise ValueError(f"不支持的枪头操作: {operation}") + + return bool(result) + + except Exception as e: + logger.error(f"执行枪头操作失败: {e}") + return False + + def _execute_plate_handling(self, parameters: Dict[str, Any]) -> bool: + """执行板类处理工作流""" + try: + operation = parameters.get('operation', 'move') + source_position = parameters.get('source_position') + target_position = parameters.get('target_position') + + if operation == 'move' and source_position and target_position: + # 移动板类 + result = self.hardware_interface.move_plate(source_position, target_position) + logger.info(f"板类移动完成: {source_position} -> {target_position}") + else: + logger.warning(f"不支持的板类操作或参数不完整: {operation}") + return False + + return bool(result) + + except Exception as e: + logger.error(f"执行板类处理失败: {e}") + return False + + def _resolve_containers(self, container_names: List[str]): + """解析容器名称为实际容器对象""" + containers = [] + for name in container_names: + try: + container = self.material_management.find_material_by_id(name) + if container: + containers.append(container) + else: + logger.warning(f"未找到容器: {name}") + except: + logger.warning(f"解析容器失败: {name}") + return containers + + def _resolve_tip_racks(self, tip_rack_names: List[str]): + """解析枪头架名称为实际对象""" + tip_racks = [] + for name in tip_rack_names: + try: + tip_rack = self.material_management.find_by_category("tip_rack") + matching_racks = [rack for rack in tip_rack if rack.name == name] + if matching_racks: + tip_racks.extend(matching_racks) + else: + logger.warning(f"未找到枪头架: {name}") + except: + logger.warning(f"解析枪头架失败: {name}") + return tip_racks + + def _find_available_tip_racks(self): + """查找可用的枪头架""" + try: + tip_racks = self.material_management.find_by_category("tip_rack") + available_racks = [rack for rack in tip_racks if hasattr(rack, 'has_tips') and rack.has_tips()] + return available_racks[:1] # 返回第一个可用的枪头架 + except: + return [] + + def stop_workflow(self, emergency: bool = False) -> bool: + """停止工作流""" + try: + if emergency: + if hasattr(self.hardware_interface, 'emergency_stop'): + return self.hardware_interface.emergency_stop() + else: + logger.warning("设备不支持紧急停止") + return False + else: + if hasattr(self.hardware_interface, 'graceful_stop'): + return self.hardware_interface.graceful_stop() + elif hasattr(self.hardware_interface, 'stop'): + return self.hardware_interface.stop() + else: + logger.warning("设备不支持优雅停止") + return False + + except Exception as e: + logger.error(f"停止PyLabRobot工作流失败: {e}") + return False + + +class ProxyWorkflowExecutor(WorkflowExecutor): + """代理工作流执行器 - 处理代理模式的工作流""" + + def __init__(self, workstation: 'WorkstationBase'): + super().__init__(workstation) + + # 验证代理接口 + if not isinstance(self.hardware_interface, str) or not self.hardware_interface.startswith("proxy:"): + raise RuntimeError("工作站硬件接口不是有效的代理字符串") + + self.device_id = self.hardware_interface[6:] # 移除 "proxy:" 前缀 + + def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行代理工作流""" + try: + # 通过协议节点调用目标设备的工作流 + if self.workstation._protocol_node: + return self.workstation._protocol_node.call_device_method( + self.device_id, 'execute_workflow', workflow_name, parameters + ) + else: + logger.error("代理模式需要protocol_node") + return False + + except Exception as e: + logger.error(f"执行代理工作流失败: {e}") + return False + + def stop_workflow(self, emergency: bool = False) -> bool: + """停止代理工作流""" + try: + if self.workstation._protocol_node: + return self.workstation._protocol_node.call_device_method( + self.device_id, 'stop_workflow', emergency + ) + else: + logger.error("代理模式需要protocol_node") + return False + + except Exception as e: + logger.error(f"停止代理工作流失败: {e}") + return False + + +# 辅助函数 +def get_executor_for_interface(hardware_interface) -> str: + """根据硬件接口类型获取执行器类型名称""" + if isinstance(hardware_interface, str) and hardware_interface.startswith("proxy:"): + return "ProxyWorkflowExecutor" + elif hasattr(hardware_interface, 'write_register') and hasattr(hardware_interface, 'read_register'): + return "ModbusWorkflowExecutor" + elif hasattr(hardware_interface, 'post') or hasattr(hardware_interface, 'get'): + return "HttpWorkflowExecutor" + elif hasattr(hardware_interface, 'transfer_liquid') or hasattr(hardware_interface, 'pickup_tips'): + return "PyLabRobotWorkflowExecutor" + else: + return "UnknownExecutor" diff --git a/unilabos/devices/work_station/workstation_base.py b/unilabos/devices/workstation/workstation_base.py similarity index 56% rename from unilabos/devices/work_station/workstation_base.py rename to unilabos/devices/workstation/workstation_base.py index 97444ccb..4b3c192b 100644 --- a/unilabos/devices/work_station/workstation_base.py +++ b/unilabos/devices/workstation/workstation_base.py @@ -1,74 +1,25 @@ """ 工作站基类 -Workstation Base Class +Workstation Base Class - 单接口模式 -集成通信、物料管理和工作流的工作站基类 -融合子设备管理、动态工作流注册等高级功能 +基于单一硬件接口的简化工作站架构 +支持直接模式和代理模式的自动工作流执行器选择 """ -import asyncio -import json import time import traceback -from typing import Dict, Any, List, Optional, Union, Callable +from typing import Dict, Any, List, Optional, Union, TYPE_CHECKING from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum -from rclpy.action import ActionServer, ActionClient -from rclpy.action.server import ServerGoalHandle -from rclpy.callback_groups import ReentrantCallbackGroup -from rclpy.service import Service -from unilabos_msgs.srv import SerialCommand -from unilabos_msgs.msg import Resource +if TYPE_CHECKING: + from unilabos.ros.nodes.presets.protocol_node import ROS2WorkstationNode -from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker -from unilabos.device_comms.workstation_material_management import MaterialManagementBase -from unilabos.device_comms.workstation_http_service import ( +from unilabos.devices.work_station.workstation_material_management import MaterialManagementBase +from unilabos.devices.work_station.workstation_http_service import ( WorkstationHTTPService, WorkstationReportRequest, MaterialUsage ) -from unilabos.ros.msgs.message_converter import convert_to_ros_msg, convert_from_ros_msg from unilabos.utils.log import logger -from unilabos.utils.type_check import serialize_result_info - - -class DeviceType(Enum): - """设备类型枚举""" - LOGICAL = "logical" # 逻辑设备 - COMMUNICATION = "communication" # 通信设备 (modbus/opcua/serial) - PROTOCOL = "protocol" # 协议设备 - - -@dataclass -class CommunicationInterface: - """通信接口配置""" - device_id: str # 通信设备ID - read_method: str # 读取方法名 - write_method: str # 写入方法名 - protocol_type: str # 协议类型 (modbus/opcua/serial) - config: Dict[str, Any] # 协议特定配置 - - -@dataclass -class WorkflowStep: - """工作流步骤定义""" - device_id: str - action_name: str - action_kwargs: Dict[str, Any] - depends_on: Optional[List[str]] = None # 依赖的步骤ID - step_id: Optional[str] = None - timeout: Optional[float] = None - retry_count: int = 0 - - -@dataclass -class WorkflowDefinition: - """工作流定义""" - name: str - description: str - steps: List[WorkflowStep] - input_schema: Dict[str, Any] - output_schema: Dict[str, Any] - metadata: Dict[str, Any] class WorkflowStatus(Enum): @@ -94,65 +45,57 @@ class WorkflowInfo: parameters_schema: Dict[str, Any] # 参数架构 -@dataclass -class CommunicationConfig: - """通信配置""" - protocol: str - host: str - port: int - timeout: float = 5.0 - retry_count: int = 3 - extra_params: Dict[str, Any] = None - - class WorkstationBase(ABC): - """工作站基类 + """工作站基类 - 单接口模式 - 提供工作站的核心功能: - 1. 物料管理 - 基于PyLabRobot的物料系统 - 2. 工作流控制 - 支持动态注册和静态预定义工作流 - 3. 状态监控 - 设备状态和生产数据监控 - 4. HTTP服务 - 接收外部报送和状态查询 - - 注意:子设备管理和通信转发功能已移入ROS2ProtocolNode + 核心设计原则: + 1. 每个工作站只有一个 hardware_interface + 2. 根据接口类型自动选择工作流执行器 + 3. 支持直接模式和代理模式 + 4. 统一的设备操作接口 """ def __init__( self, device_id: str, deck_config: Optional[Dict[str, Any]] = None, - http_service_config: Optional[Dict[str, Any]] = None, # HTTP服务配置 + http_service_config: Optional[Dict[str, Any]] = None, *args, **kwargs, ): - # 保存工作站基本配置 + # 基本配置 self.device_id = device_id self.deck_config = deck_config or {"size_x": 1000.0, "size_y": 1000.0, "size_z": 500.0} - # HTTP服务配置 - 现在专门用于报送接收 + # HTTP服务配置 self.http_service_config = http_service_config or { "enabled": True, "host": "127.0.0.1", - "port": 8081 # 默认使用8081端口作为报送接收服务 + "port": 8081 } - # 错误处理和动作追踪 - self.current_action_context = None # 当前正在执行的动作上下文 - self.error_history = [] # 错误历史记录 - self.action_results = {} # 动作结果缓存 + # 单一硬件接口 - 可以是具体客户端对象或代理字符串 + self.hardware_interface: Union[Any, str] = None - # 工作流状态 - 支持静态和动态工作流 + # 协议节点引用(用于代理模式) + self._protocol_node: Optional['ROS2WorkstationNode'] = None + + # 工作流执行器(基于通信接口类型自动选择) + self.workflow_executor: Optional['WorkflowExecutor'] = None + + # 工作流状态 self.current_workflow_status = WorkflowStatus.IDLE self.current_workflow_info = None self.workflow_start_time = None self.workflow_parameters = {} + # 错误处理 + self.error_history = [] + self.action_results = {} + # 支持的工作流(静态预定义) self.supported_workflows: Dict[str, WorkflowInfo] = {} - # 动态注册的工作流 - self.registered_workflows: Dict[str, WorkflowDefinition] = {} - # 初始化工作站模块 self.material_management: MaterialManagementBase = self._create_material_management_module() @@ -163,113 +106,166 @@ class WorkstationBase(ABC): self.http_service = None self._start_http_service() - logger.info(f"工作站基类 {device_id} 初始化完成") + logger.info(f"工作站 {device_id} 初始化完成(单接口模式)") - @abstractmethod - def _create_material_management_module(self) -> MaterialManagementBase: - """创建物料管理模块 - 子类必须实现""" - pass + def set_hardware_interface(self, hardware_interface: Union[Any, str]): + """设置硬件接口""" + self.hardware_interface = hardware_interface + + # 根据接口类型自动创建工作流执行器 + self._setup_workflow_executor() + + logger.info(f"工作站 {self.device_id} 硬件接口设置: {type(hardware_interface).__name__}") - @abstractmethod - def _register_supported_workflows(self): - """注册支持的工作流 - 子类必须实现""" - pass + def set_protocol_node(self, protocol_node: 'ROS2WorkstationNode'): + """设置协议节点引用(用于代理模式)""" + self._protocol_node = protocol_node + logger.info(f"工作站 {self.device_id} 关联协议节点") - def _create_workstation_services(self): - """创建工作站ROS服务""" - def _start_http_service(self): - """启动HTTP报送接收服务""" - if self.http_service_config.get("enabled", True): - try: - self.http_service = WorkstationHTTPService( - host=self.http_service_config.get("host", "127.0.0.1"), - port=self.http_service_config.get("port", 8081), - workstation_handler=self - ) - logger.info(f"HTTP报送接收服务已启动: {self.http_service_config['host']}:{self.http_service_config['port']}") - except Exception as e: - logger.error(f"启动HTTP报送接收服务失败: {e}") - else: - logger.info("HTTP报送接收服务已禁用") - - def _stop_http_service(self): - """停止HTTP报送接收服务""" - if self.http_service: - try: - self.http_service.stop() - logger.info("HTTP报送接收服务已停止") - except Exception as e: - logger.error(f"停止HTTP报送接收服务失败: {e}") - - # ============ 核心业务方法 ============ - - def start_workflow(self, workflow_type: str, parameters: Dict[str, Any] = None) -> bool: - """启动工作流 - 业务逻辑层""" + def _setup_workflow_executor(self): + """根据硬件接口类型自动设置工作流执行器""" + if self.hardware_interface is None: + return + + # 动态导入工作流执行器类 try: - if self.current_workflow_status != WorkflowStatus.IDLE: - logger.warning(f"工作流 {workflow_type} 启动失败:当前状态为 {self.current_workflow_status}") - return False + from unilabos.devices.work_station.workflow_executors import ( + ProxyWorkflowExecutor, ModbusWorkflowExecutor, + HttpWorkflowExecutor, PyLabRobotWorkflowExecutor + ) + except ImportError: + logger.warning("工作流执行器模块未找到,将使用基础执行器") + self.workflow_executor = None + return + + # 检查是否为代理字符串 + if isinstance(self.hardware_interface, str) and self.hardware_interface.startswith("proxy:"): + self.workflow_executor = ProxyWorkflowExecutor(self) + logger.info(f"工作站 {self.device_id} 使用代理工作流执行器") + # 检查是否为Modbus客户端 + elif hasattr(self.hardware_interface, 'write_register') and hasattr(self.hardware_interface, 'read_register'): + self.workflow_executor = ModbusWorkflowExecutor(self) + logger.info(f"工作站 {self.device_id} 使用Modbus工作流执行器") + + # 检查是否为HTTP客户端 + elif hasattr(self.hardware_interface, 'post') or hasattr(self.hardware_interface, 'get'): + self.workflow_executor = HttpWorkflowExecutor(self) + logger.info(f"工作站 {self.device_id} 使用HTTP工作流执行器") + + # 检查是否为PyLabRobot设备 + elif hasattr(self.hardware_interface, 'transfer_liquid') or hasattr(self.hardware_interface, 'pickup_tips'): + self.workflow_executor = PyLabRobotWorkflowExecutor(self) + logger.info(f"工作站 {self.device_id} 使用PyLabRobot工作流执行器") + + else: + logger.warning(f"工作站 {self.device_id} 无法识别硬件接口类型: {type(self.hardware_interface)}") + self.workflow_executor = None + + # ============ 统一的设备操作接口 ============ + + def call_device_method(self, method: str, *args, **kwargs) -> Any: + """调用设备方法的统一接口""" + # 1. 代理模式:通过协议节点转发 + if isinstance(self.hardware_interface, str) and self.hardware_interface.startswith("proxy:"): + if not self._protocol_node: + raise RuntimeError("代理模式需要设置protocol_node") + + device_id = self.hardware_interface[6:] # 移除 "proxy:" 前缀 + return self._protocol_node.call_device_method(device_id, method, *args, **kwargs) + + # 2. 直接模式:直接调用硬件接口方法 + elif self.hardware_interface and hasattr(self.hardware_interface, method): + return getattr(self.hardware_interface, method)(*args, **kwargs) + + else: + raise AttributeError(f"硬件接口不支持方法: {method}") + + def get_device_status(self) -> Dict[str, Any]: + """获取设备状态""" + try: + return self.call_device_method('get_status') + except AttributeError: + # 如果设备不支持get_status方法,返回基础状态 + return { + "status": "unknown", + "interface_type": type(self.hardware_interface).__name__, + "timestamp": time.time() + } + + def is_device_available(self) -> bool: + """检查设备是否可用""" + try: + self.get_device_status() + return True + except: + return False + + # ============ 工作流控制接口 ============ + + def execute_workflow(self, workflow_name: str, parameters: Dict[str, Any]) -> bool: + """执行工作流 - 委托给工作流执行器""" + if not self.workflow_executor: + logger.error(f"工作站 {self.device_id} 工作流执行器未初始化") + return False + + try: # 设置工作流状态 self.current_workflow_status = WorkflowStatus.INITIALIZING - self.workflow_parameters = parameters or {} + self.workflow_parameters = parameters self.workflow_start_time = time.time() - # 执行具体的工作流启动逻辑 - success = self._execute_start_workflow(workflow_type, parameters or {}) + # 委托给工作流执行器 + success = self.workflow_executor.execute_workflow(workflow_name, parameters) if success: self.current_workflow_status = WorkflowStatus.RUNNING - logger.info(f"工作流 {workflow_type} 启动成功") + logger.info(f"工作站 {self.device_id} 工作流 {workflow_name} 启动成功") else: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"工作流 {workflow_type} 启动失败") + logger.error(f"工作站 {self.device_id} 工作流 {workflow_name} 启动失败") return success except Exception as e: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"启动工作流失败: {e}") + logger.error(f"工作站 {self.device_id} 执行工作流失败: {e}") return False + def start_workflow(self, workflow_type: str, parameters: Dict[str, Any] = None) -> bool: + """启动工作流 - 兼容旧接口""" + return self.execute_workflow(workflow_type, parameters or {}) + def stop_workflow(self, emergency: bool = False) -> bool: - """停止工作流 - 业务逻辑层""" + """停止工作流""" + if not self.workflow_executor: + logger.warning(f"工作站 {self.device_id} 工作流执行器未初始化") + return True + try: if self.current_workflow_status in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]: - logger.warning("没有正在运行的工作流") + logger.warning(f"工作站 {self.device_id} 没有正在运行的工作流") return True self.current_workflow_status = WorkflowStatus.STOPPING - # 执行具体的工作流停止逻辑 - success = self._execute_stop_workflow(emergency) + # 委托给工作流执行器 + success = self.workflow_executor.stop_workflow(emergency) if success: self.current_workflow_status = WorkflowStatus.STOPPED - logger.info(f"工作流停止成功 (紧急: {emergency})") + logger.info(f"工作站 {self.device_id} 工作流停止成功 (紧急: {emergency})") else: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"工作流停止失败") + logger.error(f"工作站 {self.device_id} 工作流停止失败") return success except Exception as e: self.current_workflow_status = WorkflowStatus.ERROR - logger.error(f"停止工作流失败: {e}") + logger.error(f"工作站 {self.device_id} 停止工作流失败: {e}") return False - # ============ 抽象方法 - 子类必须实现具体的工作流控制 ============ - - @abstractmethod - def _execute_start_workflow(self, workflow_type: str, parameters: Dict[str, Any]) -> bool: - """执行启动工作流的具体逻辑 - 子类实现""" - pass - - @abstractmethod - def _execute_stop_workflow(self, emergency: bool) -> bool: - """执行停止工作流的具体逻辑 - 子类实现""" - pass - # ============ 状态属性 ============ @property @@ -303,11 +299,25 @@ class WorkstationBase(ABC): """获取最后一个错误""" return self.error_history[-1] if self.error_history else None + # ============ 抽象方法 - 子类必须实现 ============ + + @abstractmethod + def _create_material_management_module(self) -> MaterialManagementBase: + """创建物料管理模块 - 子类必须实现""" + pass + + @abstractmethod + def _register_supported_workflows(self): + """注册支持的工作流 - 子类必须实现""" + pass + + # ============ HTTP服务管理 ============ + def _start_http_service(self): """启动HTTP报送接收服务""" try: if not self.http_service_config.get("enabled", True): - logger.info("HTTP报送接收服务已禁用") + logger.info(f"工作站 {self.device_id} HTTP报送接收服务已禁用") return host = self.http_service_config.get("host", "127.0.0.1") @@ -322,7 +332,7 @@ class WorkstationBase(ABC): logger.info(f"工作站 {self.device_id} HTTP报送接收服务启动成功: {host}:{port}") except Exception as e: - logger.error(f"启动HTTP报送接收服务失败: {e}") + logger.error(f"工作站 {self.device_id} 启动HTTP报送接收服务失败: {e}") self.http_service = None def _stop_http_service(self): @@ -331,12 +341,9 @@ class WorkstationBase(ABC): if self.http_service: self.http_service.stop() self.http_service = None - logger.info("HTTP报送接收服务已停止") + logger.info(f"工作站 {self.device_id} HTTP报送接收服务已停止") except Exception as e: - logger.error(f"停止HTTP报送接收服务失败: {e}") - logger.error(f"停止HTTP报送接收服务失败: {e}") - - # ============ 报送处理方法 ============ + logger.error(f"工作站 {self.device_id} 停止HTTP报送接收服务失败: {e}") # ============ 报送处理方法 ============ diff --git a/unilabos/devices/work_station/workstation_http_service.py b/unilabos/devices/workstation/workstation_http_service.py similarity index 100% rename from unilabos/devices/work_station/workstation_http_service.py rename to unilabos/devices/workstation/workstation_http_service.py diff --git a/unilabos/devices/work_station/workstation_material_management.py b/unilabos/devices/workstation/workstation_material_management.py similarity index 100% rename from unilabos/devices/work_station/workstation_material_management.py rename to unilabos/devices/workstation/workstation_material_management.py diff --git a/unilabos/registry/devices/work_station.yaml b/unilabos/registry/devices/work_station.yaml index d8bb21ac..c96c68a6 100644 --- a/unilabos/registry/devices/work_station.yaml +++ b/unilabos/registry/devices/work_station.yaml @@ -6112,7 +6112,7 @@ workstation: title: initialize_device参数 type: object type: UniLabJsonCommand - module: unilabos.ros.nodes.presets.protocol_node:ROS2ProtocolNode + module: unilabos.ros.nodes.presets.protocol_node:ROS2WorkstationNode status_types: {} type: ros2 config_info: [] diff --git a/unilabos/ros/msgs/message_converter.py b/unilabos/ros/msgs/message_converter.py index b0716aab..657a2077 100644 --- a/unilabos/ros/msgs/message_converter.py +++ b/unilabos/ros/msgs/message_converter.py @@ -51,6 +51,8 @@ SendCmd = msg_converter_manager.get_class("unilabos_msgs.action:SendCmd") imsg = msg_converter_manager.get_module("unilabos.messages") Point3D = msg_converter_manager.get_class("unilabos.messages:Point3D") +from control_msgs.action import * + # 基本消息类型映射 _msg_mapping: Dict[Type, Type] = { float: Float64, diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 612acb12..f88c27d1 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -50,7 +50,7 @@ from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker from unilabos.ros.x.rclpyx import get_event_loop -from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator +from unilabos.ros.utils.driver_creator import WorkstationNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func from unilabos.utils.log import info, debug, warning, error, critical, logger, trace from unilabos.utils.type_check import get_type_class, TypeEncoder, serialize_result_info @@ -340,14 +340,11 @@ class BaseROS2DeviceNode(Node, Generic[T]): # 物料传输到对应的node节点 rclient = self.create_client(ResourceAdd, "/resources/add") rclient.wait_for_service() - rclient2 = self.create_client(ResourceAdd, "/resources/add") - rclient2.wait_for_service() request = ResourceAdd.Request() - request2 = ResourceAdd.Request() + command_json = json.loads(req.command) namespace = command_json["namespace"] bind_parent_id = command_json["bind_parent_id"] - edge_device_id = command_json["edge_device_id"] location = command_json["bind_location"] other_calling_param = command_json["other_calling_param"] resources = command_json["resource"] @@ -357,7 +354,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): LIQUID_VOLUME = other_calling_param.pop("LIQUID_VOLUME", []) LIQUID_INPUT_SLOT = other_calling_param.pop("LIQUID_INPUT_SLOT", []) slot = other_calling_param.pop("slot", "-1") - resource = None + parent = None if slot != "-1": # slot为负数的时候采用assign方法 other_calling_param["slot"] = slot # 本地拿到这个物料,可能需要先做初始化? @@ -385,20 +382,20 @@ class BaseROS2DeviceNode(Node, Generic[T]): logger.info(f"添加物料{container_query_dict['name']}到资源跟踪器") else: assert len(found_resources) == 1, f"找到多个同名物料: {container_query_dict['name']}, 请检查物料系统" - resource = found_resources[0] - if isinstance(resource, Resource): - regular_container = RegularContainer(resource.id) - regular_container.ulr_resource = resource + parent = found_resources[0] + if isinstance(parent, Resource): + regular_container = RegularContainer(parent.id) + regular_container.ulr_resource = parent regular_container.ulr_resource_data.update(json.loads(container_instance.data)) - logger.info(f"更新物料{container_query_dict['name']}的数据{resource.data} ULR") - elif isinstance(resource, dict): - if "data" not in resource: - resource["data"] = {} - resource["data"].update(json.loads(container_instance.data)) - request.resources[0].name = resource["name"] - logger.info(f"更新物料{container_query_dict['name']}的数据{resource['data']} dict") + logger.info(f"更新物料{container_query_dict['name']}的数据{parent.data} ULR") + elif isinstance(parent, dict): + if "data" not in parent: + parent["data"] = {} + parent["data"].update(json.loads(container_instance.data)) + request.resources[0].name = parent["name"] + logger.info(f"更新物料{container_query_dict['name']}的数据{parent['data']} dict") else: - logger.info(f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(resource)} {resource}") + logger.info(f"更新物料{container_query_dict['name']}出现不支持的数据类型{type(parent)} {parent}") response = await rclient.call_async(request) # 应该先add_resource了 res.response = "OK" @@ -423,18 +420,16 @@ class BaseROS2DeviceNode(Node, Generic[T]): return res # 接下来该根据bind_parent_id进行assign了,目前只有plr可以进行assign,不然没有办法输入到物料系统中 if bind_parent_id != self.node_name: - resource = self.resource_tracker.figure_resource({"name": bind_parent_id}) # 拿到父节点,进行具体assign等操作 + parent = self.resource_tracker.figure_resource({"name": bind_parent_id}) # 拿到父节点,进行具体assign等操作 # request.resources = [convert_to_ros_msg(Resource, resources)] try: from pylabrobot.resources.resource import Resource as ResourcePLR from pylabrobot.resources.deck import Deck - from pylabrobot.resources import Coordinate - from pylabrobot.resources import OTDeck - from pylabrobot.resources import Plate + from pylabrobot.resources import Coordinate, OTDeck, Plate - contain_model = not isinstance(resource, Deck) - if isinstance(resource, ResourcePLR): + contain_model = not isinstance(parent, Deck) + if isinstance(parent, ResourcePLR): # resources.list() resources_tree = dict_to_tree(copy.deepcopy({r["id"]: r for r in resources})) plr_instance = resource_ulab_to_plr(resources_tree[0], contain_model) @@ -445,20 +440,22 @@ class BaseROS2DeviceNode(Node, Generic[T]): ): empty_liquid_info_in[liquid_input_slot] = (liquid_type, liquid_volume) plr_instance.set_well_liquids(empty_liquid_info_in) - if isinstance(resource, OTDeck) and "slot" in other_calling_param: + if isinstance(parent, OTDeck) and "slot" in other_calling_param: other_calling_param["slot"] = int(other_calling_param["slot"]) - resource.assign_child_at_slot(plr_instance, **other_calling_param) + parent.assign_child_at_slot(plr_instance, **other_calling_param) else: _discard_slot = other_calling_param.pop("slot", "-1") - resource.assign_child_resource( + parent.assign_child_resource( plr_instance, Coordinate(location["x"], location["y"], location["z"]), **other_calling_param, ) + + request2 = ResourceAdd.Request() request2.resources = [ - convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(resource)]) + convert_to_ros_msg(Resource, r) for r in tree_to_list([resource_plr_to_ulab(parent)]) ] - rclient2.call(request2) + rclient.call(request2) # 发送给ResourceMeshManager action_client = ActionClient( self, @@ -947,7 +944,7 @@ class ROS2DeviceNode: # TODO: 要在创建之前预先请求服务器是否有当前id的物料,放到resource_tracker中,让pylabrobot进行创建 # 创建设备类实例 - # 判断是否包含设备子节点,决定是否使用ROS2ProtocolNode + # 判断是否包含设备子节点,决定是否使用ROS2WorkstationNode has_device_children = any( child_config.get("type", "device") == "device" for child_config in children.values() @@ -961,17 +958,17 @@ class ROS2DeviceNode: driver_class, children=children, resource_tracker=self.resource_tracker ) else: - from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode - from unilabos.device_comms.workstation_base import WorkstationBase + from unilabos.ros.nodes.presets.protocol_node import ROS2WorkstationNode + from unilabos.devices.work_station.workstation_base import WorkstationBase # 检查是否是WorkstationBase的子类且包含设备子节点 if issubclass(self._driver_class, WorkstationBase) and has_device_children: - # WorkstationBase + 设备子节点 -> 使用ProtocolNode作为ros_instance + # WorkstationBase + 设备子节点 -> 使用WorkstationNode作为ros_instance self._use_protocol_node_ros = True self._driver_creator = DeviceClassCreator(driver_class, children=children, resource_tracker=self.resource_tracker) - elif issubclass(self._driver_class, ROS2ProtocolNode): # 是ProtocolNode的子节点,就要调用ProtocolNodeCreator + elif issubclass(self._driver_class, ROS2WorkstationNode): # 是WorkstationNode的子节点,就要调用WorkstationNodeCreator self._use_protocol_node_ros = False - self._driver_creator = ProtocolNodeCreator(driver_class, children=children, resource_tracker=self.resource_tracker) + self._driver_creator = WorkstationNodeCreator(driver_class, children=children, resource_tracker=self.resource_tracker) else: self._use_protocol_node_ros = False self._driver_creator = DeviceClassCreator(driver_class, children=children, resource_tracker=self.resource_tracker) @@ -988,8 +985,8 @@ class ROS2DeviceNode: if driver_is_ros: self._ros_node = self._driver_instance # type: ignore elif hasattr(self, '_use_protocol_node_ros') and self._use_protocol_node_ros: - # WorkstationBase + 设备子节点 -> 创建ROS2ProtocolNode作为ros_instance - from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode + # WorkstationBase + 设备子节点 -> 创建ROS2WorkstationNode作为ros_instance + from unilabos.ros.nodes.presets.protocol_node import ROS2WorkstationNode # 从children提取设备协议类型 protocol_types = set() @@ -1006,7 +1003,7 @@ class ROS2DeviceNode: if not protocol_types: protocol_types = ["default_protocol"] - self._ros_node = ROS2ProtocolNode( + self._ros_node = ROS2WorkstationNode( device_id=device_id, children=children, protocol_type=list(protocol_types), diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 5fe90684..0f9f951e 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -523,7 +523,7 @@ class HostNode(BaseROS2DeviceNode): # 解析设备名和属性名 parts = topic.split("/") - if len(parts) >= 4: # 可能有ProtocolNode,创建更长的设备 + if len(parts) >= 4: # 可能有WorkstationNode,创建更长的设备 device_id = "/".join(parts[2:-1]) property_name = parts[-1] diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index ad835ee1..c8c338c3 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -28,9 +28,9 @@ from unilabos.utils.log import error from unilabos.utils.type_check import serialize_result_info -class ROS2ProtocolNode(BaseROS2DeviceNode): +class ROS2WorkstationNode(BaseROS2DeviceNode): """ - ROS2ProtocolNode代表管理ROS2环境中设备通信和动作的协议节点。 + ROS2WorkstationNode代表管理ROS2环境中设备通信和动作的协议节点。 它初始化设备节点,处理动作客户端,并基于指定的协议执行工作流。 它还物理上代表一组协同工作的设备,如带夹持器的机械臂,带传送带的CNC机器等。 """ @@ -43,7 +43,8 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): children: dict, protocol_type: Union[str, list[str]], resource_tracker: DeviceNodeResourceTracker, - workstation_config: dict = None, # 新增:工作站配置 + workstation_config: dict = {}, + workstation_instance: object = None, *args, **kwargs, ): @@ -55,10 +56,12 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): self.communication_interfaces = self.workstation_config.get('communication_interfaces', {}) # 从工作站配置获取通信接口 # 新增:获取工作站实例(如果存在) - self.workstation_instance = self.workstation_config.get('workstation_instance') + self.workstation_instance = workstation_instance self._busy = False self.sub_devices = {} + self.communication_devices = {} + self.logical_devices = {} self._goals = {} self._protocol_servers = {} self._action_clients = {} @@ -72,73 +75,38 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): device_id=device_id, status_types={}, action_value_mappings=self.protocol_action_mappings, - hardware_interface={}, + hardware_interface={"name": "hardware_interface", "write": "send_command", "read": "read_data", "extra_info": []}, print_publish=False, resource_tracker=resource_tracker, ) # 初始化子设备 - self.communication_node_id_to_instance = {} self._initialize_child_devices() - # 设置硬件接口代理 - self._setup_hardware_proxies() - - # 新增:如果有工作站实例,建立双向引用 - if self.workstation_instance: - self.workstation_instance._protocol_node = self - self._setup_workstation_method_proxies() - self.lab_logger().info(f"ROS2ProtocolNode {device_id} 与工作站实例 {type(self.workstation_instance).__name__} 关联") - - self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") - - def _setup_workstation_method_proxies(self): - """设置工作站方法代理""" - if not self.workstation_instance: - return - - # 代理工作站的核心方法 - workstation_methods = [ - 'start_workflow', 'stop_workflow', 'workflow_status', 'is_busy', - 'process_material_change_report', 'process_step_finish_report', - 'process_sample_finish_report', 'process_order_finish_report', - 'handle_external_error' - ] - - for method_name in workstation_methods: - if hasattr(self.workstation_instance, method_name): - # 创建代理方法 - setattr(self, method_name, getattr(self.workstation_instance, method_name)) - self.lab_logger().debug(f"代理工作站方法: {method_name}") - - # ============ 工作站方法代理 ============ - - def get_workstation_status(self) -> Dict[str, Any]: - """获取工作站状态""" - if self.workstation_instance: - return { - 'workflow_status': str(self.workstation_instance.workflow_status.value), - 'is_busy': self.workstation_instance.is_busy, - 'workflow_runtime': self.workstation_instance.workflow_runtime, - 'error_count': self.workstation_instance.error_count, - 'last_error': self.workstation_instance.last_error - } - return {'status': 'no_workstation_instance'} - - def delegate_to_workstation(self, method_name: str, *args, **kwargs): - """委托方法调用给工作站实例""" - if self.workstation_instance and hasattr(self.workstation_instance, method_name): - method = getattr(self.workstation_instance, method_name) - return method(*args, **kwargs) + if isinstance(getattr(driver_instance, "hardware_interface", None), str): + self.logical_devices[device_id] = driver_instance else: - self.lab_logger().warning(f"工作站实例不存在或没有方法: {method_name}") - return None + self.communication_devices[device_id] = driver_instance + + # 设置硬件接口代理 + for device_id, device_node in self.logical_devices.items(): + if device_node and hasattr(device_node, 'ros_node_instance'): + self._setup_device_hardware_proxy(device_id, device_node) + + # 新增:如果有工作站实例,建立双向引用和硬件接口设置 + if self.workstation_instance: + self._setup_workstation_integration() + + def _setup_workstation_integration(self): + """设置工作站集成 - 统一设备处理模式""" + # 1. 建立协议节点引用 + self.workstation_instance.set_protocol_node(self) + + self.lab_logger().info(f"ROS2WorkstationNode {self.device_id} 与工作站实例 {type(self.workstation_instance).__name__} 集成完成") def _initialize_child_devices(self): """初始化子设备 - 重构为更清晰的方法""" # 设备分类字典 - 统一管理 - self.communication_devices = {} - self.logical_devices = {} for device_id, device_config in self.children.items(): if device_config.get("type", "device") != "device": @@ -157,7 +125,6 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): # 兼容旧的ID匹配方式和新的配置方式 if device_type == "communication" or "serial_" in device_id or "io_" in device_id: - self.communication_node_id_to_instance[device_id] = d # 保持向后兼容 self.communication_devices[device_id] = d # 新的统一方式 self.lab_logger().info(f"通信设备 {device_id} 初始化并分类成功") elif device_type == "logical": @@ -171,91 +138,100 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): except Exception as ex: self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}") - def _setup_hardware_proxies(self): - """设置硬件接口代理 - 重构为独立方法,支持工作站配置""" - # 1. 传统的协议节点硬件代理设置 - for device_id, device_config in self.children.items(): - if device_config.get("type", "device") != "device": - continue - - # 设置硬件接口代理 - if device_id not in self.sub_devices: - self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...") - continue - - d = self.sub_devices[device_id] - if d: - self._setup_device_hardware_proxy(device_id, d) + def _setup_device_hardware_proxy(self, device_id: str, device: ROS2DeviceNode): + """统一的设备硬件接口代理设置方法 - # 2. 工作站配置的通信接口代理设置 - if hasattr(self, 'communication_interfaces') and self.communication_interfaces: - self._setup_workstation_communication_interfaces() - - self.lab_logger().info(f"ROS2ProtocolNode {self.device_id} initialized with protocols: {self.protocol_names}") - - def _setup_workstation_communication_interfaces(self): - """设置工作站特定的通信接口代理""" - for logical_device_id, logical_device in self.logical_devices.items(): - # 检查是否有配置的通信接口 - interface_config = getattr(self, 'communication_interfaces', {}).get(logical_device_id) - if not interface_config: - continue - - comm_device = self.communication_devices.get(interface_config.device_id) - if not comm_device: - self.lab_logger().error(f"通信设备 {interface_config.device_id} 不存在") - continue - - # 设置工作站级别的通信代理 - self._setup_workstation_hardware_proxy( - logical_device, - comm_device, - interface_config - ) - - def _setup_workstation_hardware_proxy(self, logical_device, comm_device, interface_config): - """为逻辑设备设置工作站级通信代理""" - try: - # 获取通信设备的读写方法 - read_func = getattr(comm_device.driver_instance, interface_config.read_method, None) - write_func = getattr(comm_device.driver_instance, interface_config.write_method, None) - - if read_func: - setattr(logical_device.driver_instance, 'comm_read', read_func) - if write_func: - setattr(logical_device.driver_instance, 'comm_write', write_func) - - # 设置通信配置 - setattr(logical_device.driver_instance, 'comm_config', interface_config.config) - setattr(logical_device.driver_instance, 'comm_protocol', interface_config.protocol_type) - - self.lab_logger().info(f"为逻辑设备 {logical_device.device_id} 设置工作站通信代理 -> {comm_device.device_id}") - - except Exception as e: - self.lab_logger().error(f"设置工作站通信代理失败: {e}") - - def _setup_device_hardware_proxy(self, device_id: str, device): - """为单个设备设置硬件接口代理""" + Args: + device_id: 设备ID + device: 设备实例 + """ hardware_interface = device.ros_node_instance._hardware_interface - if ( + if not self._validate_hardware_interface(device, hardware_interface): + return + + # 获取硬件接口名称 + interface_name = getattr(device.driver_instance, hardware_interface["name"]) + + # 情况1: 如果interface_name是字符串,说明需要转发到其他设备 + if isinstance(interface_name, str): + # 查找目标设备 + communication_device = self.communication_devices.get(device_id, None) + if not communication_device: + self.lab_logger().error(f"转发目标设备 {device_id} 不存在") + return + + read_method = hardware_interface.get("read", None) + write_method = hardware_interface.get("write", None) + + # 设置传统硬件代理 + communicate_hardware_info = communication_device.ros_node_instance._hardware_interface + self._setup_hardware_proxy(device, communication_device, read_method, write_method) + self.lab_logger().info( + f"传统通信代理:为子设备{device.device_id} " + f"添加了{read_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['read']}) " + f"添加了{write_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['write']})" + ) + self.lab_logger().info(f"字符串转发代理:设备 {device.device_id} -> {device_id}") + + # 情况2: 如果设备有communication_interface配置,设置协议代理 + elif hasattr(self, 'communication_interfaces') and device_id in self.communication_interfaces: + interface_config = self._get_communication_interface_config(device_id) + protocol_type = interface_config.get('protocol_type', 'modbus') + self._setup_communication_proxy(device, interface_config, protocol_type) + + # 情况3: 其他情况,使用默认处理 + else: + self.lab_logger().debug(f"设备 {device_id} 使用默认硬件接口处理") + + def _get_communication_interface_config(self, device_id: str) -> dict: + """获取设备的通信接口配置""" + # 优先从工作站配置获取 + if hasattr(self, 'communication_interfaces') and device_id in self.communication_interfaces: + return self.communication_interfaces[device_id] + + # 从设备自身配置获取 + device_node = self.logical_devices[device_id] + if device_node and hasattr(device_node.driver_instance, 'communication_interface'): + return getattr(device_node.driver_instance, 'communication_interface') + + return {} + + def _validate_hardware_interface(self, device: ROS2DeviceNode, hardware_interface: dict) -> bool: + """验证硬件接口配置""" + return ( hasattr(device.driver_instance, hardware_interface["name"]) and hasattr(device.driver_instance, hardware_interface["write"]) and (hardware_interface["read"] is None or hasattr(device.driver_instance, hardware_interface["read"])) - ): - name = getattr(device.driver_instance, hardware_interface["name"]) - read = hardware_interface.get("read", None) - write = hardware_interface.get("write", None) + ) - # 如果硬件接口是字符串,通过通信设备提供 - if isinstance(name, str) and name in self.sub_devices: - communicate_device = self.sub_devices[name] - communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface - self._setup_hardware_proxy(device, self.sub_devices[name], read, write) - self.lab_logger().info( - f"\n通信代理:为子设备{device_id}\n " - f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n " - f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})" - ) + def _setup_communication_proxy(self, logical_device: ROS2DeviceNode, interface_config, protocol_type): + """为逻辑设备设置通信代理 - 统一方法""" + try: + # 获取通信设备 + comm_device_id = interface_config.get('device_id') + comm_device = self.communication_devices.get(comm_device_id) + + if not comm_device: + self.lab_logger().error(f"通信设备 {comm_device_id} 不存在") + return + + # 根据协议类型设置不同的代理方法 + if protocol_type == 'modbus': + self._setup_modbus_proxy(logical_device, comm_device, interface_config) + elif protocol_type == 'opcua': + self._setup_opcua_proxy(logical_device, comm_device, interface_config) + elif protocol_type == 'http': + self._setup_http_proxy(logical_device, comm_device, interface_config) + elif protocol_type == 'serial': + self._setup_serial_proxy(logical_device, comm_device, interface_config) + else: + self.lab_logger().warning(f"不支持的协议类型: {protocol_type}") + return + + self.lab_logger().info(f"通信代理:为逻辑设备 {logical_device.device_id} 设置{protocol_type}通信代理 -> {comm_device_id}") + + except Exception as e: + self.lab_logger().error(f"设置通信代理失败: {e}") def _setup_protocol_names(self, protocol_type): # 处理协议类型 @@ -493,6 +469,99 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): """还没有改过的部分""" + def _setup_modbus_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置Modbus通信代理""" + config = interface_config.get('config', {}) + + # 设置Modbus读写方法 + def modbus_read(address, count=1, function_code=3): + """Modbus读取方法""" + return comm_device.driver_instance.read_holding_registers( + address=address, + count=count, + slave_id=config.get('slave_id', 1) + ) + + def modbus_write(address, value, function_code=6): + """Modbus写入方法""" + if isinstance(value, (list, tuple)): + return comm_device.driver_instance.write_multiple_registers( + address=address, + values=value, + slave_id=config.get('slave_id', 1) + ) + else: + return comm_device.driver_instance.write_single_register( + address=address, + value=value, + slave_id=config.get('slave_id', 1) + ) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, 'comm_read', modbus_read) + setattr(logical_device.driver_instance, 'comm_write', modbus_write) + setattr(logical_device.driver_instance, 'comm_config', config) + setattr(logical_device.driver_instance, 'comm_protocol', 'modbus') + + def _setup_opcua_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置OPC UA通信代理""" + config = interface_config.get('config', {}) + + def opcua_read(node_id): + """OPC UA读取方法""" + return comm_device.driver_instance.read_node_value(node_id) + + def opcua_write(node_id, value): + """OPC UA写入方法""" + return comm_device.driver_instance.write_node_value(node_id, value) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, 'comm_read', opcua_read) + setattr(logical_device.driver_instance, 'comm_write', opcua_write) + setattr(logical_device.driver_instance, 'comm_config', config) + setattr(logical_device.driver_instance, 'comm_protocol', 'opcua') + + def _setup_http_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置HTTP/RPC通信代理""" + config = interface_config.get('config', {}) + base_url = config.get('base_url', 'http://localhost:8080') + + def http_read(endpoint, params=None): + """HTTP GET请求""" + url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" + return comm_device.driver_instance.get_request(url, params=params) + + def http_write(endpoint, data): + """HTTP POST请求""" + url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" + return comm_device.driver_instance.post_request(url, data=data) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, 'comm_read', http_read) + setattr(logical_device.driver_instance, 'comm_write', http_write) + setattr(logical_device.driver_instance, 'comm_config', config) + setattr(logical_device.driver_instance, 'comm_protocol', 'http') + + def _setup_serial_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): + """设置串口通信代理""" + config = interface_config.get('config', {}) + + def serial_read(timeout=1.0): + """串口读取方法""" + return comm_device.driver_instance.read_data(timeout=timeout) + + def serial_write(data): + """串口写入方法""" + if isinstance(data, str): + data = data.encode('utf-8') + return comm_device.driver_instance.write_data(data) + + # 绑定方法到逻辑设备 + setattr(logical_device.driver_instance, 'comm_read', serial_read) + setattr(logical_device.driver_instance, 'comm_write', serial_write) + setattr(logical_device.driver_instance, 'comm_config', config) + setattr(logical_device.driver_instance, 'comm_protocol', 'serial') + def _setup_hardware_proxy( self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method ): diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index 0e84683e..188d65b9 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -3,7 +3,7 @@ from typing import Union, Dict, Any, Optional from unilabos_msgs.msg import Resource from pylabrobot.resources import Resource as PLRResource, Plate, TipRack, Coordinate -from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode +from unilabos.ros.nodes.presets.protocol_node import ROS2WorkstationNode from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker @@ -44,7 +44,7 @@ def get_workstation_plate_resource(name: str) -> PLRResource: # 要给定一个 return plate -class WorkStationExample(ROS2ProtocolNode): +class WorkStationExample(ROS2WorkstationNode): def __init__(self, # 你可以在这里增加任意的参数,对应启动json填写相应的参数内容 device_id: str, diff --git a/unilabos/ros/utils/driver_creator.py b/unilabos/ros/utils/driver_creator.py index 862f04c1..5e16989f 100644 --- a/unilabos/ros/utils/driver_creator.py +++ b/unilabos/ros/utils/driver_creator.py @@ -267,40 +267,40 @@ class PyLabRobotCreator(DeviceClassCreator[T]): ROS2DeviceNode.run_async_func(getattr(self.device_instance, "setup")).add_done_callback(done_cb) -class ProtocolNodeCreator(DeviceClassCreator[T]): +class WorkstationNodeCreator(DeviceClassCreator[T]): """ - ProtocolNode设备类创建器 + WorkstationNode设备类创建器 - 这个类提供了针对ProtocolNode设备类的实例创建方法,处理children参数。 + 这个类提供了针对WorkstationNode设备类的实例创建方法,处理children参数。 """ def __init__(self, cls: Type[T], children: Dict[str, Any], resource_tracker: DeviceNodeResourceTracker): """ - 初始化ProtocolNode设备类创建器 + 初始化WorkstationNode设备类创建器 Args: - cls: ProtocolNode设备类 + cls: WorkstationNode设备类 children: 子资源字典,用于资源替换 """ super().__init__(cls, children, resource_tracker) def create_instance(self, data: Dict[str, Any]) -> T: """ - 从数据创建ProtocolNode设备实例 + 从数据创建WorkstationNode设备实例 Args: data: 用于创建实例的数据 Returns: - ProtocolNode设备类实例 + WorkstationNode设备类实例 """ try: # 创建实例,额外补充一个给protocol node的字段,后面考虑取消 data["children"] = self.children - self.device_instance = super(ProtocolNodeCreator, self).create_instance(data) + self.device_instance = super(WorkstationNodeCreator, self).create_instance(data) self.post_create() return self.device_instance except Exception as e: - logger.error(f"ProtocolNode创建实例失败: {e}") - logger.error(f"ProtocolNode创建实例堆栈: {traceback.format_exc()}") + logger.error(f"WorkstationNode创建实例失败: {e}") + logger.error(f"WorkstationNode创建实例堆栈: {traceback.format_exc()}") raise