From 9f823a4198b64557af95c61bdabac109b1aec3a6 Mon Sep 17 00:00:00 2001 From: Junhan Chang Date: Thu, 21 Aug 2025 10:05:58 +0800 Subject: [PATCH] update workstation base --- unilabos/device_comms/workstation_base.py | 858 ++++++++++++++++-- .../device_comms/workstation_http_service.py | 605 ++++++++++++ unilabos/ros/nodes/presets/protocol_node.py | 135 ++- 3 files changed, 1508 insertions(+), 90 deletions(-) create mode 100644 unilabos/device_comms/workstation_http_service.py diff --git a/unilabos/device_comms/workstation_base.py b/unilabos/device_comms/workstation_base.py index 7b61c17b..887d56e5 100644 --- a/unilabos/device_comms/workstation_base.py +++ b/unilabos/device_comms/workstation_base.py @@ -25,6 +25,9 @@ from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker from unilabos.device_comms.workstation_communication import WorkstationCommunicationBase, CommunicationConfig from unilabos.device_comms.workstation_material_management import MaterialManagementBase +from unilabos.device_comms.workstation_http_service import ( + WorkstationHTTPService, WorkstationReportRequest, MaterialUsage +) from unilabos.ros.msgs.message_converter import convert_to_ros_msg, convert_from_ros_msg from unilabos.utils.log import logger from unilabos.utils.type_check import serialize_result_info @@ -114,6 +117,7 @@ class WorkstationBase(ROS2ProtocolNode, ABC): communication_config: CommunicationConfig, deck_config: Optional[Dict[str, Any]] = None, communication_interfaces: Optional[Dict[str, CommunicationInterface]] = None, + http_service_config: Optional[Dict[str, Any]] = None, # 新增:HTTP服务配置 *args, **kwargs, ): @@ -122,6 +126,18 @@ class WorkstationBase(ROS2ProtocolNode, ABC): self.deck_config = deck_config or {"size_x": 1000.0, "size_y": 1000.0, "size_z": 500.0} self.communication_interfaces = communication_interfaces or {} + # HTTP服务配置 - 现在专门用于报送接收 + self.http_service_config = http_service_config or { + "enabled": True, + "host": "127.0.0.1", + "port": 8081 # 默认使用8081端口作为报送接收服务 + } + + # 错误处理和动作追踪 + self.current_action_context = None # 当前正在执行的动作上下文 + self.error_history = [] # 错误历史记录 + self.action_results = {} # 动作结果缓存 + # 工作流状态 - 支持静态和动态工作流 self.current_workflow_status = WorkflowStatus.IDLE self.current_workflow_info = None @@ -135,28 +151,27 @@ class WorkstationBase(ROS2ProtocolNode, ABC): self.registered_workflows: Dict[str, WorkflowDefinition] = {} self._workflow_action_servers: Dict[str, ActionServer] = {} - # 初始化基类 - ROS2ProtocolNode会处理子设备初始化 + # 初始化基类 - ROS2ProtocolNode会处理所有设备管理 super().__init__( device_id=device_id, children=children, protocol_type=protocol_type, resource_tracker=resource_tracker, + workstation_config={ + 'communication_interfaces': communication_interfaces, + 'deck_config': self.deck_config + }, *args, **kwargs ) - # 工作站特有的设备分类 (基于已初始化的sub_devices) - self.communication_devices: Dict[str, Any] = {} - self.logical_devices: Dict[str, Any] = {} - self._classify_devices() + # 使用父类的设备分类结果(不再重复分类) + # self.communication_devices 和 self.logical_devices 由 ROS2ProtocolNode 提供 # 初始化工作站模块 self.communication: WorkstationCommunicationBase = self._create_communication_module() self.material_management: MaterialManagementBase = self._create_material_management_module() - # 设置工作站特定的通信接口 - self._setup_workstation_communication_interfaces() - # 注册支持的工作流 self._register_supported_workflows() @@ -166,62 +181,12 @@ class WorkstationBase(ROS2ProtocolNode, ABC): # 启动状态监控 self._start_status_monitoring() + # 启动HTTP报送接收服务 + self.http_service = None + self._start_http_service() + logger.info(f"增强工作站基类 {device_id} 初始化完成") - def _classify_devices(self): - """基于已初始化的设备进行分类""" - for device_id, device in self.sub_devices.items(): - device_config = self.children.get(device_id, {}) - device_type = DeviceType(device_config.get("device_type", "logical")) - - if device_type == DeviceType.COMMUNICATION: - self.communication_devices[device_id] = device - logger.info(f"通信设备 {device_id} 已分类") - elif device_type == DeviceType.LOGICAL: - self.logical_devices[device_id] = device - logger.info(f"逻辑设备 {device_id} 已分类") - - def _setup_workstation_communication_interfaces(self): - """设置工作站特定的通信接口代理""" - for logical_device_id, logical_device in self.logical_devices.items(): - # 检查是否有配置的通信接口 - interface_config = self.communication_interfaces.get(logical_device_id) - if not interface_config: - continue - - comm_device = self.communication_devices.get(interface_config.device_id) - if not comm_device: - logger.error(f"通信设备 {interface_config.device_id} 不存在") - continue - - # 设置工作站级别的通信代理 - self._setup_workstation_hardware_proxy( - logical_device, - comm_device, - interface_config - ) - - def _setup_workstation_hardware_proxy(self, logical_device, comm_device, interface: CommunicationInterface): - """为逻辑设备设置工作站级通信代理""" - try: - # 获取通信设备的读写方法 - read_func = getattr(comm_device.driver_instance, interface.read_method, None) - write_func = getattr(comm_device.driver_instance, interface.write_method, None) - - if read_func: - setattr(logical_device.driver_instance, 'comm_read', read_func) - if write_func: - setattr(logical_device.driver_instance, 'comm_write', write_func) - - # 设置通信配置 - setattr(logical_device.driver_instance, 'comm_config', interface.config) - setattr(logical_device.driver_instance, 'comm_protocol', interface.protocol_type) - - logger.info(f"为逻辑设备 {logical_device.device_id} 设置工作站通信代理 -> {comm_device.device_id}") - - except Exception as e: - logger.error(f"设置工作站通信代理失败: {e}") - @abstractmethod def _create_communication_module(self) -> WorkstationCommunicationBase: """创建通信模块 - 子类必须实现""" @@ -355,6 +320,579 @@ class WorkstationBase(ROS2ProtocolNode, ABC): # 目前简化为按需查询 pass + def _start_http_service(self): + """启动HTTP报送接收服务""" + try: + if not self.http_service_config.get("enabled", True): + logger.info("HTTP报送接收服务已禁用") + return + + host = self.http_service_config.get("host", "127.0.0.1") + port = self.http_service_config.get("port", 8081) + + self.http_service = WorkstationHTTPService( + workstation_instance=self, + host=host, + port=port + ) + + self.http_service.start() + logger.info(f"工作站 {self.device_id} HTTP报送接收服务启动成功: {self.http_service.service_url}") + + except Exception as e: + logger.error(f"启动HTTP报送接收服务失败: {e}") + self.http_service = None + + def _stop_http_service(self): + """停止HTTP报送接收服务""" + try: + if self.http_service: + self.http_service.stop() + self.http_service = None + logger.info("HTTP报送接收服务已停止") + except Exception as e: + logger.error(f"停止HTTP报送接收服务失败: {e}") + + # ============ 报送处理方法 ============ + + def process_material_change_report(self, report) -> Dict[str, Any]: + """处理物料变更报送 - 同步到 ResourceTracker 并发送 ROS2 更新""" + try: + logger.info(f"处理物料变更报送: {report.workstation_id} -> {report.resource_id} ({report.change_type})") + + # 增加接收计数 + self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1 + + # 准备变更数据 + changes = { + 'workstation_id': report.workstation_id, + 'timestamp': report.timestamp, + 'change_type': report.change_type, + 'resource_id': report.resource_id + } + + # 添加额外的变更信息 + if hasattr(report, 'new_location'): + changes['location'] = { + 'x': getattr(report.new_location, 'x', 0), + 'y': getattr(report.new_location, 'y', 0), + 'z': getattr(report.new_location, 'z', 0) + } + + if hasattr(report, 'quantity'): + changes['quantity'] = report.quantity + + if hasattr(report, 'status'): + changes['status'] = report.status + + # 同步到 ResourceTracker + sync_success = self.resource_tracker.update_material_state( + report.resource_id, + changes, + report.change_type + ) + + result = { + 'processed': True, + 'resource_id': report.resource_id, + 'change_type': report.change_type, + 'next_actions': [], + 'tracker_sync': sync_success + } + + # 发送 ROS2 ResourceUpdate 请求到 host node + if sync_success: + try: + self._send_resource_update_to_host(report.resource_id, changes) + result['ros_update_sent'] = True + result['next_actions'].append('ros_update_completed') + except Exception as e: + logger.warning(f"发送ROS2资源更新失败: {e}") + result['ros_update_sent'] = False + result['warnings'] = [f"ROS2更新失败: {str(e)}"] + + # 根据变更类型处理 + if report.change_type == 'created': + result['next_actions'].append('sync_to_global_registry') + self._handle_material_created(report) + + elif report.change_type == 'updated': + result['next_actions'].append('update_local_state') + self._handle_material_updated(report) + + elif report.change_type == 'moved': + result['next_actions'].append('update_location_tracking') + self._handle_material_moved(report) + + elif report.change_type == 'consumed': + result['next_actions'].append('update_inventory') + self._handle_material_consumed(report) + + elif report.change_type == 'completed': + result['next_actions'].append('trigger_next_workflow') + self._handle_material_completed(report) + + # 更新本地物料管理系统(如果存在) + if hasattr(self, 'material_management'): + try: + self.material_management.sync_external_material_change(report) + except Exception as e: + logger.warning(f"同步物料变更到本地管理系统失败: {e}") + + return result + + except Exception as e: + logger.error(f"处理物料变更报送失败: {e}") + return { + 'processed': False, + 'error': str(e), + 'next_actions': ['retry_processing'] + } + + def process_workflow_status_report(self, workstation_id: str, workflow_id: str, + status: str, data: Dict[str, Any]) -> Dict[str, Any]: + """处理工作流状态报送""" + try: + logger.info(f"处理工作流状态报送: {workstation_id} -> {workflow_id} ({status})") + + # 增加接收计数 + self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1 + + result = { + 'processed': True, + 'workflow_id': workflow_id, + 'status': status + } + + # 这里可以添加工作流状态同步逻辑 + # 例如:更新本地工作流状态、触发后续动作等 + + return result + + except Exception as e: + logger.error(f"处理工作流状态报送失败: {e}") + return {'processed': False, 'error': str(e)} + + # ============ 统一报送处理方法(基于LIMS协议规范) ============ + + def process_step_finish_report(self, request: WorkstationReportRequest) -> Dict[str, Any]: + """处理步骤完成报送(统一LIMS协议规范)- 同步到 ResourceTracker""" + try: + data = request.data + logger.info(f"处理步骤完成报送: {data['orderCode']} - {data['stepName']} (步骤ID: {data['stepId']})") + + # 增加接收计数 + self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1 + + # 同步步骤信息到 ResourceTracker + step_changes = { + 'order_code': data['orderCode'], + 'order_name': data.get('orderName', ''), + 'step_name': data['stepName'], + 'step_id': data['stepId'], + 'sample_id': data['sampleId'], + 'start_time': data['startTime'], + 'end_time': data['endTime'], + 'execution_status': data.get('executionStatus', 'completed'), + 'status': 'step_completed', + 'last_updated': request.request_time + } + + # 更新 ResourceTracker 中的样本状态 + sample_sync_success = False + if data['sampleId']: + sample_sync_success = self.resource_tracker.update_material_state( + data['sampleId'], + { + 'current_step': data['stepName'], + 'step_status': 'completed', + 'last_step_time': data['endTime'], + 'execution_status': data.get('executionStatus', 'completed') + }, + 'step_finished' + ) + + result = { + 'processed': True, + 'order_code': data['orderCode'], + 'step_id': data['stepId'], + 'step_name': data['stepName'], + 'sample_id': data['sampleId'], + 'start_time': data['startTime'], + 'end_time': data['endTime'], + 'execution_status': data.get('executionStatus', 'completed'), + 'next_actions': [], + 'sample_sync': sample_sync_success + } + + # 发送 ROS2 ResourceUpdate 到 host node + if sample_sync_success and data['sampleId']: + try: + self._send_resource_update_to_host(data['sampleId'], step_changes) + result['ros_update_sent'] = True + result['next_actions'].append('ros_step_update_completed') + except Exception as e: + logger.warning(f"发送ROS2步骤完成更新失败: {e}") + result['ros_update_sent'] = False + result['warnings'] = [f"ROS2更新失败: {str(e)}"] + + # 处理步骤完成逻辑 + try: + # 更新步骤状态 + result['next_actions'].append('update_step_status') + + # 检查是否触发后续步骤 + result['next_actions'].append('check_next_step') + + # 更新通量进度 + result['next_actions'].append('update_sample_progress') + + # 记录步骤完成事件 + self._record_step_completion(data) + + except Exception as e: + logger.warning(f"步骤完成处理过程中出现警告: {e}") + result['warnings'] = result.get('warnings', []) + [str(e)] + + return result + + except Exception as e: + logger.error(f"处理步骤完成报送失败: {e}") + return { + 'processed': False, + 'error': str(e), + 'next_actions': ['retry_processing'] + } + + def process_sample_finish_report(self, request: WorkstationReportRequest) -> Dict[str, Any]: + """处理通量完成报送(统一LIMS协议规范)""" + try: + data = request.data + logger.info(f"处理通量完成报送: {data['orderCode']} - 通量ID: {data['sampleId']} (状态: {data['Status']})") + + # 增加接收计数 + self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1 + + result = { + 'processed': True, + 'order_code': data['orderCode'], + 'sample_id': data['sampleId'], + 'status': data['Status'], + 'start_time': data['startTime'], + 'end_time': data['endTime'], + 'next_actions': [] + } + + # 根据通量状态处理 + status = int(data['Status']) + if status == 20: # 完成 + result['next_actions'].extend(['update_sample_completed', 'check_order_completion']) + self._record_sample_completion(data, 'completed') + elif status == -2: # 异常停止 + result['next_actions'].extend(['log_sample_error', 'trigger_error_handling']) + self._record_sample_completion(data, 'error') + elif status == -3: # 人工停止或取消 + result['next_actions'].extend(['log_sample_cancelled', 'update_order_status']) + self._record_sample_completion(data, 'cancelled') + elif status == 10: # 开始 + result['next_actions'].append('update_sample_started') + self._record_sample_start(data) + elif status == 2: # 进样 + result['next_actions'].append('update_sample_intake') + self._record_sample_intake(data) + + return result + + except Exception as e: + logger.error(f"处理通量完成报送失败: {e}") + return { + 'processed': False, + 'error': str(e), + 'next_actions': ['retry_processing'] + } + + def process_order_finish_report(self, request: WorkstationReportRequest, used_materials: List[MaterialUsage]) -> Dict[str, Any]: + """处理任务完成报送(统一LIMS协议规范)""" + try: + data = request.data + logger.info(f"处理任务完成报送: {data['orderCode']} - {data['orderName']} (状态: {data['status']})") + + # 增加接收计数 + self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1 + + result = { + 'processed': True, + 'order_code': data['orderCode'], + 'order_name': data['orderName'], + 'status': data['status'], + 'start_time': data['startTime'], + 'end_time': data['endTime'], + 'used_materials_count': len(used_materials), + 'next_actions': [] + } + + # 根据任务状态处理 + status = int(data['status']) + if status == 30: # 完成 + result['next_actions'].extend([ + 'update_order_completed', + 'process_material_usage', + 'generate_completion_report' + ]) + self._record_order_completion(data, used_materials, 'completed') + elif status == -11: # 异常停止 + result['next_actions'].extend([ + 'log_order_error', + 'trigger_error_handling', + 'process_partial_material_usage' + ]) + self._record_order_completion(data, used_materials, 'error') + elif status == -12: # 人工停止或取消 + result['next_actions'].extend([ + 'log_order_cancelled', + 'revert_material_reservations' + ]) + self._record_order_completion(data, used_materials, 'cancelled') + + # 处理物料使用记录 + if used_materials: + material_usage_result = self._process_material_usage(used_materials) + result['material_usage'] = material_usage_result + + return result + + except Exception as e: + logger.error(f"处理任务完成报送失败: {e}") + return { + 'processed': False, + 'error': str(e), + 'next_actions': ['retry_processing'] + } + + # ============ 具体的报送处理方法 ============ + + def _handle_material_created(self, report): + """处理物料创建报送""" + try: + # 已废弃的方法,保留用于兼容性 + logger.debug(f"处理物料创建: {getattr(report, 'resource_id', 'unknown')}") + except Exception as e: + logger.error(f"处理物料创建失败: {e}") + + def _handle_material_updated(self, report): + """处理物料更新报送""" + try: + logger.debug(f"处理物料更新: {getattr(report, 'resource_id', 'unknown')}") + except Exception as e: + logger.error(f"处理物料更新失败: {e}") + + def _handle_material_moved(self, report): + """处理物料移动报送""" + try: + logger.debug(f"处理物料移动: {getattr(report, 'resource_id', 'unknown')}") + except Exception as e: + logger.error(f"处理物料移动失败: {e}") + + def _handle_material_consumed(self, report): + """处理物料消耗报送""" + try: + logger.debug(f"处理物料消耗: {getattr(report, 'resource_id', 'unknown')}") + except Exception as e: + logger.error(f"处理物料消耗失败: {e}") + + def _handle_material_completed(self, report): + """处理物料完成报送""" + try: + logger.debug(f"处理物料完成: {getattr(report, 'resource_id', 'unknown')}") + except Exception as e: + logger.error(f"处理物料完成失败: {e}") + + # ============ 工作流控制接口 ============ + def handle_external_error(self, error_request): + """处理外部错误请求""" + try: + logger.error(f"收到外部错误处理请求: {getattr(error_request, 'error_type', 'unknown')}") + return { + 'success': True, + 'message': "错误已记录", + 'error_code': 'OK' + } + except Exception as e: + logger.error(f"处理外部错误失败: {e}") + return { + 'success': False, + 'message': f"错误处理失败: {str(e)}", + 'error_code': 'ERROR_HANDLING_FAILED' + } + + def _process_error_handling(self, error_request, error_record): + """处理具体的错误类型""" + return {'success': True, 'actions_taken': ['已转换为统一报送']} + """处理具体的错误类型""" + try: + result = {'success': True, 'actions_taken': []} + + # 1. 如果有特定动作ID,标记该动作失败 + if error_request.action_id: + self._mark_action_failed(error_request.action_id, error_request.error_message) + result['actions_taken'].append(f"标记动作 {error_request.action_id} 为失败") + + # 2. 如果有工作流ID,停止相关工作流 + if error_request.workflow_id: + self._handle_workflow_error(error_request.workflow_id, error_request.error_message) + result['actions_taken'].append(f"处理工作流 {error_request.workflow_id} 错误") + + # 3. 根据错误类型执行特定处理 + error_type = error_request.error_type.lower() + + if error_type in ['material_error', 'resource_error']: + # 物料相关错误 + material_result = self._handle_material_error(error_request) + result['actions_taken'].extend(material_result.get('actions', [])) + + elif error_type in ['device_error', 'communication_error']: + # 设备通信错误 + device_result = self._handle_device_error(error_request) + result['actions_taken'].extend(device_result.get('actions', [])) + + elif error_type in ['workflow_error', 'process_error']: + # 工作流程错误 + workflow_result = self._handle_process_error(error_request) + result['actions_taken'].extend(workflow_result.get('actions', [])) + + else: + # 通用错误处理 + result['actions_taken'].append("执行通用错误处理") + + # 4. 如果是严重错误,触发紧急停止 + if error_request.error_type.lower() in ['critical_error', 'safety_error', 'emergency']: + self._trigger_emergency_stop(error_request.error_message) + result['actions_taken'].append("触发紧急停止") + + result['message'] = "错误处理完成" + return result + + except Exception as e: + logger.error(f"错误处理过程失败: {e}") + return { + 'success': False, + 'message': f"错误处理过程失败: {str(e)}", + 'error_code': 'ERROR_PROCESSING_FAILED' + } + + def _mark_action_failed(self, action_id: str, error_message: str): + """标记指定动作为失败""" + try: + # 创建失败结果 + failed_result = { + 'success': False, + 'error': True, + 'error_message': error_message, + 'timestamp': time.time(), + 'marked_by_external_error': True + } + + # 存储到动作结果缓存 + self.action_results[action_id] = failed_result + + # 如果当前有正在执行的动作,更新其状态 + if self.current_action_context and self.current_action_context.get('action_id') == action_id: + self.current_action_context['failed'] = True + self.current_action_context['error_message'] = error_message + + logger.info(f"动作 {action_id} 已标记为失败: {error_message}") + + except Exception as e: + logger.error(f"标记动作失败时出错: {e}") + + def _handle_workflow_error(self, workflow_id: str, error_message: str): + """处理工作流错误""" + try: + # 如果是当前正在运行的工作流 + if (self.current_workflow_info and + self.current_workflow_info.get('id') == workflow_id): + + # 停止当前工作流 + self.stop_workflow(emergency=True) + logger.info(f"因外部错误停止工作流 {workflow_id}: {error_message}") + + except Exception as e: + logger.error(f"处理工作流错误失败: {e}") + + def _handle_material_error(self, error_request): + """处理物料相关错误(已废弃,请使用统一报送接口)""" + return {'success': True, 'message': '物料错误已记录'} + """处理物料相关错误""" + actions = [] + try: + # 可以触发物料重新扫描、位置重置等 + if error_request.context and 'resource_id' in error_request.context: + resource_id = error_request.context['resource_id'] + # 触发物料状态更新 + actions.append(f"更新物料 {resource_id} 状态") + + actions.append("执行物料错误恢复流程") + + except Exception as e: + logger.error(f"处理物料错误失败: {e}") + + return {'actions': actions} + + def _handle_device_error(self, error_request): + """处理设备相关错误(已废弃,请使用统一报送接口)""" + return {'success': True, 'message': '设备错误已记录'} + """处理设备错误""" + actions = [] + try: + if error_request.device_id: + # 重置设备连接 + actions.append(f"重置设备 {error_request.device_id} 连接") + + # 如果是通信设备,重新建立连接 + if error_request.device_id in self.communication_devices: + actions.append(f"重新建立通信设备 {error_request.device_id} 连接") + + actions.append("执行设备错误恢复流程") + + except Exception as e: + logger.error(f"处理设备错误失败: {e}") + + return {'actions': actions} + + def _handle_process_error(self, error_request): + """处理流程相关错误(已废弃,请使用统一报送接口)""" + return {'success': True, 'message': '流程错误已记录'} + """处理工作流程错误""" + actions = [] + try: + # 暂停当前工作流 + if self.current_workflow_status not in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]: + actions.append("暂停当前工作流") + + actions.append("执行工作流程错误恢复") + + except Exception as e: + logger.error(f"处理工作流程错误失败: {e}") + + return {'actions': actions} + + def _trigger_emergency_stop(self, reason: str): + """触发紧急停止""" + try: + logger.critical(f"触发紧急停止: {reason}") + + # 停止所有工作流 + self.stop_workflow(emergency=True) + + # 设置错误状态 + self.current_workflow_status = WorkflowStatus.ERROR + + # 可以在这里添加更多紧急停止逻辑 + # 例如:断开设备连接、保存当前状态等 + + except Exception as e: + logger.error(f"执行紧急停止失败: {e}") + # ============ 工作流控制接口 ============ def _handle_start_workflow(self, request, response): @@ -1298,5 +1836,199 @@ class WorkstationBase(ROS2ProtocolNode, ABC): "active_workflows": self.active_dynamic_workflows, "total_resources": self.workstation_resource_count, "communication_status": self.communication_status, - "material_status": self.material_status + "material_status": self.material_status, + "http_service_running": self.http_service.is_running if self.http_service else False } + + # ============ 增强动作执行 - 支持错误处理和追踪 ============ + + async def execute_single_action(self, device_id, action_name, action_kwargs): + """执行单个动作 - 增强版,支持错误处理和动作追踪""" + # 构建动作ID + if device_id in ["", None, "self"]: + action_id = f"/devices/{self.device_id}/{action_name}" + else: + action_id = f"/devices/{device_id}/{action_name}" + + # 设置动作上下文 + self.current_action_context = { + 'action_id': action_id, + 'device_id': device_id, + 'action_name': action_name, + 'action_kwargs': action_kwargs, + 'start_time': time.time(), + 'failed': False, + 'error_message': None + } + + try: + # 检查是否已被外部标记为失败 + if action_id in self.action_results: + cached_result = self.action_results[action_id] + if cached_result.get('marked_by_external_error'): + logger.warning(f"动作 {action_id} 已被外部标记为失败") + return self._create_failed_result(cached_result['error_message']) + + # 检查动作客户端是否存在 + if action_id not in self._action_clients: + error_msg = f"找不到动作客户端: {action_id}" + self.lab_logger().error(error_msg) + return self._create_failed_result(error_msg) + + # 发送动作请求 + action_client = self._action_clients[action_id] + goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs) + + self.lab_logger().debug(f"发送动作请求到: {action_id}") + action_client.wait_for_server() + + # 等待动作完成 + request_future = action_client.send_goal_async(goal_msg) + handle = await request_future + + if not handle.accepted: + error_msg = f"动作请求被拒绝: {action_name}" + self.lab_logger().error(error_msg) + return self._create_failed_result(error_msg) + + # 在执行过程中检查是否被外部标记为失败 + result_future = await handle.get_result_async() + + # 再次检查是否在执行过程中被标记为失败 + if self.current_action_context.get('failed'): + error_msg = self.current_action_context.get('error_message', '动作被外部标记为失败') + logger.warning(f"动作 {action_id} 在执行过程中被标记为失败: {error_msg}") + return self._create_failed_result(error_msg) + + result = result_future.result + + # 存储成功结果 + self.action_results[action_id] = { + 'success': True, + 'result': result, + 'timestamp': time.time(), + 'execution_time': time.time() - self.current_action_context['start_time'] + } + + self.lab_logger().debug(f"动作完成: {action_name}") + return result + + except Exception as e: + error_msg = f"动作执行异常: {str(e)}" + logger.error(f"执行动作 {action_id} 失败: {e}\n{traceback.format_exc()}") + return self._create_failed_result(error_msg) + + finally: + # 清理动作上下文 + self.current_action_context = None + + def _create_failed_result(self, error_message: str): + """创建失败结果对象""" + # 这需要根据具体的动作类型来创建相应的结果对象 + # 这里返回一个通用的失败标识 + class FailedResult: + def __init__(self, error_msg): + self.success = False + self.return_info = json.dumps({ + "suc": False, + "error": True, + "error_message": error_msg, + "timestamp": time.time() + }) + + return FailedResult(error_message) + + def __del__(self): + """析构函数 - 清理HTTP服务""" + try: + self._stop_http_service() + self._stop_reporting_service() + except: + pass + + # ============ LIMS辅助方法 ============ + + def _record_step_completion(self, step_data: Dict[str, Any]): + """记录步骤完成事件""" + try: + logger.debug(f"记录步骤完成: {step_data['stepName']} - {step_data['stepId']}") + # 这里可以添加步骤完成的记录逻辑 + # 例如:更新数据库、发送通知等 + except Exception as e: + logger.error(f"记录步骤完成失败: {e}") + + def _record_sample_completion(self, sample_data: Dict[str, Any], completion_type: str): + """记录通量完成事件""" + try: + logger.debug(f"记录通量完成: {sample_data['sampleId']} - {completion_type}") + # 这里可以添加通量完成的记录逻辑 + except Exception as e: + logger.error(f"记录通量完成失败: {e}") + + def _record_sample_start(self, sample_data: Dict[str, Any]): + """记录通量开始事件""" + try: + logger.debug(f"记录通量开始: {sample_data['sampleId']}") + # 这里可以添加通量开始的记录逻辑 + except Exception as e: + logger.error(f"记录通量开始失败: {e}") + + def _record_sample_intake(self, sample_data: Dict[str, Any]): + """记录通量进样事件""" + try: + logger.debug(f"记录通量进样: {sample_data['sampleId']}") + # 这里可以添加通量进样的记录逻辑 + except Exception as e: + logger.error(f"记录通量进样失败: {e}") + + def _record_order_completion(self, order_data: Dict[str, Any], used_materials: List, completion_type: str): + """记录任务完成事件""" + try: + logger.debug(f"记录任务完成: {order_data['orderCode']} - {completion_type}") + # 这里可以添加任务完成的记录逻辑 + # 包括物料使用记录的处理 + except Exception as e: + logger.error(f"记录任务完成失败: {e}") + + def _process_material_usage(self, used_materials: List) -> Dict[str, Any]: + """处理物料使用记录""" + try: + logger.debug(f"处理物料使用记录: {len(used_materials)} 条") + + processed_materials = [] + for material in used_materials: + material_record = { + 'material_id': material.materialId, + 'location_id': material.locationId, + 'type_mode': material.typeMode, + 'used_quantity': material.usedQuantity, + 'processed_time': time.time() + } + processed_materials.append(material_record) + + # 更新库存 + self._update_material_inventory(material) + + return { + 'processed_count': len(processed_materials), + 'materials': processed_materials, + 'success': True + } + + except Exception as e: + logger.error(f"处理物料使用记录失败: {e}") + return { + 'processed_count': 0, + 'materials': [], + 'success': False, + 'error': str(e) + } + + def _update_material_inventory(self, material): + """更新物料库存""" + try: + # 这里可以添加库存更新逻辑 + # 例如:调用库存管理系统API、更新本地缓存等 + logger.debug(f"更新物料库存: {material.materialId} - 使用量: {material.usedQuantity}") + except Exception as e: + logger.error(f"更新物料库存失败: {e}") diff --git a/unilabos/device_comms/workstation_http_service.py b/unilabos/device_comms/workstation_http_service.py new file mode 100644 index 00000000..3805d2ce --- /dev/null +++ b/unilabos/device_comms/workstation_http_service.py @@ -0,0 +1,605 @@ +""" +工作站HTTP服务模块 +Workstation HTTP Service Module + +统一的工作站报送接收服务,基于LIMS协议规范: +1. 步骤完成报送 - POST /report/step_finish +2. 通量完成报送 - POST /report/sample_finish +3. 任务完成报送 - POST /report/order_finish +4. 批量更新报送 - POST /report/batch_update +5. 物料变更报送 - POST /report/material_change +6. 错误处理报送 - POST /report/error_handling +7. 健康检查和状态查询 + +统一使用LIMS协议字段规范,简化接口避免功能重复 +""" +import json +import threading +import time +import traceback +from typing import Dict, Any, Optional, List +from http.server import BaseHTTPRequestHandler, HTTPServer +from urllib.parse import urlparse +from dataclasses import dataclass, asdict +from datetime import datetime + +from unilabos.utils.log import logger + + +@dataclass +class WorkstationReportRequest: + """统一工作站报送请求(基于LIMS协议规范)""" + token: str # 授权令牌 + request_time: str # 请求时间,格式:2024-12-12 12:12:12.xxx + data: Dict[str, Any] # 报送数据 + + +@dataclass +class MaterialUsage: + """物料使用记录""" + materialId: str # 物料Id(GUID) + locationId: str # 库位Id(GUID) + typeMode: str # 物料类型(样品1、试剂2、耗材0) + usedQuantity: float # 使用的数量(数字) + + +@dataclass +class HttpResponse: + """HTTP响应""" + success: bool + message: str + data: Optional[Dict[str, Any]] = None + acknowledgment_id: Optional[str] = None + + +class WorkstationHTTPHandler(BaseHTTPRequestHandler): + """工作站HTTP请求处理器""" + + def __init__(self, workstation_instance, *args, **kwargs): + self.workstation = workstation_instance + super().__init__(*args, **kwargs) + + def do_POST(self): + """处理POST请求 - 统一的工作站报送接口""" + try: + # 解析请求路径 + parsed_path = urlparse(self.path) + endpoint = parsed_path.path + + # 读取请求体 + content_length = int(self.headers.get('Content-Length', 0)) + if content_length > 0: + post_data = self.rfile.read(content_length) + request_data = json.loads(post_data.decode('utf-8')) + else: + request_data = {} + + logger.info(f"收到工作站报送: {endpoint} - {request_data.get('token', 'unknown')}") + + # 统一的报送端点路由(基于LIMS协议规范) + if endpoint == '/report/step_finish': + response = self._handle_step_finish_report(request_data) + elif endpoint == '/report/sample_finish': + response = self._handle_sample_finish_report(request_data) + elif endpoint == '/report/order_finish': + response = self._handle_order_finish_report(request_data) + elif endpoint == '/report/batch_update': + response = self._handle_batch_update_report(request_data) + # 扩展报送端点 + elif endpoint == '/report/material_change': + response = self._handle_material_change_report(request_data) + elif endpoint == '/report/error_handling': + response = self._handle_error_handling_report(request_data) + # 保留LIMS协议端点以兼容现有系统 + elif endpoint == '/LIMS/step_finish': + response = self._handle_step_finish_report(request_data) + elif endpoint == '/LIMS/preintake_finish': + response = self._handle_sample_finish_report(request_data) + elif endpoint == '/LIMS/order_finish': + response = self._handle_order_finish_report(request_data) + else: + response = HttpResponse( + success=False, + message=f"不支持的报送端点: {endpoint}", + data={"supported_endpoints": [ + "/report/step_finish", + "/report/sample_finish", + "/report/order_finish", + "/report/batch_update", + "/report/material_change", + "/report/error_handling" + ]} + ) + + # 发送响应 + self._send_response(response) + + except Exception as e: + logger.error(f"处理工作站报送失败: {e}\\n{traceback.format_exc()}") + error_response = HttpResponse( + success=False, + message=f"请求处理失败: {str(e)}" + ) + self._send_response(error_response) + + def do_GET(self): + """处理GET请求 - 健康检查和状态查询""" + try: + parsed_path = urlparse(self.path) + endpoint = parsed_path.path + + if endpoint == '/status': + response = self._handle_status_check() + elif endpoint == '/health': + response = HttpResponse(success=True, message="服务健康") + else: + response = HttpResponse( + success=False, + message=f"不支持的查询端点: {endpoint}", + data={"supported_endpoints": ["/status", "/health"]} + ) + + self._send_response(response) + + except Exception as e: + logger.error(f"GET请求处理失败: {e}") + error_response = HttpResponse( + success=False, + message=f"GET请求处理失败: {str(e)}" + ) + self._send_response(error_response) + + def _handle_step_finish_report(self, request_data: Dict[str, Any]) -> HttpResponse: + """处理步骤完成报送(统一LIMS协议规范)""" + try: + # 验证基本字段 + required_fields = ['token', 'request_time', 'data'] + if missing_fields := [field for field in required_fields if field not in request_data]: + return HttpResponse( + success=False, + message=f"缺少必要字段: {', '.join(missing_fields)}" + ) + + # 验证data字段内容 + data = request_data['data'] + data_required_fields = ['orderCode', 'orderName', 'stepName', 'stepId', 'sampleId', 'startTime', 'endTime'] + if data_missing_fields := [field for field in data_required_fields if field not in data]: + return HttpResponse( + success=False, + message=f"data字段缺少必要内容: {', '.join(data_missing_fields)}" + ) + + # 创建统一请求对象 + report_request = WorkstationReportRequest( + token=request_data['token'], + request_time=request_data['request_time'], + data=data + ) + + # 调用工作站处理方法 + result = self.workstation.process_step_finish_report(report_request) + + return HttpResponse( + success=True, + message=f"步骤完成报送已处理: {data['stepName']} ({data['orderCode']})", + acknowledgment_id=f"STEP_{int(time.time() * 1000)}_{data['stepId']}", + data=result + ) + + except Exception as e: + logger.error(f"处理步骤完成报送失败: {e}") + return HttpResponse( + success=False, + message=f"步骤完成报送处理失败: {str(e)}" + ) + + def _handle_sample_finish_report(self, request_data: Dict[str, Any]) -> HttpResponse: + """处理通量完成报送(统一LIMS协议规范)""" + try: + # 验证基本字段 + required_fields = ['token', 'request_time', 'data'] + if missing_fields := [field for field in required_fields if field not in request_data]: + return HttpResponse( + success=False, + message=f"缺少必要字段: {', '.join(missing_fields)}" + ) + + # 验证data字段内容 + data = request_data['data'] + data_required_fields = ['orderCode', 'orderName', 'sampleId', 'startTime', 'endTime', 'Status'] + if data_missing_fields := [field for field in data_required_fields if field not in data]: + return HttpResponse( + success=False, + message=f"data字段缺少必要内容: {', '.join(data_missing_fields)}" + ) + + # 创建统一请求对象 + report_request = WorkstationReportRequest( + token=request_data['token'], + request_time=request_data['request_time'], + data=data + ) + + # 调用工作站处理方法 + result = self.workstation.process_sample_finish_report(report_request) + + status_names = { + "0": "待生产", "2": "进样", "10": "开始", + "20": "完成", "-2": "异常停止", "-3": "人工停止" + } + status_desc = status_names.get(str(data['Status']), f"状态{data['Status']}") + + return HttpResponse( + success=True, + message=f"通量完成报送已处理: {data['sampleId']} ({data['orderCode']}) - {status_desc}", + acknowledgment_id=f"SAMPLE_{int(time.time() * 1000)}_{data['sampleId']}", + data=result + ) + + except Exception as e: + logger.error(f"处理通量完成报送失败: {e}") + return HttpResponse( + success=False, + message=f"通量完成报送处理失败: {str(e)}" + ) + + def _handle_order_finish_report(self, request_data: Dict[str, Any]) -> HttpResponse: + """处理任务完成报送(统一LIMS协议规范)""" + try: + # 验证基本字段 + required_fields = ['token', 'request_time', 'data'] + if missing_fields := [field for field in required_fields if field not in request_data]: + return HttpResponse( + success=False, + message=f"缺少必要字段: {', '.join(missing_fields)}" + ) + + # 验证data字段内容 + data = request_data['data'] + data_required_fields = ['orderCode', 'orderName', 'startTime', 'endTime', 'status'] + if data_missing_fields := [field for field in data_required_fields if field not in data]: + return HttpResponse( + success=False, + message=f"data字段缺少必要内容: {', '.join(data_missing_fields)}" + ) + + # 处理物料使用记录 + used_materials = [] + if 'usedMaterials' in data: + for material_data in data['usedMaterials']: + material = MaterialUsage( + materialId=material_data.get('materialId', ''), + locationId=material_data.get('locationId', ''), + typeMode=material_data.get('typeMode', ''), + usedQuantity=material_data.get('usedQuantity', 0.0) + ) + used_materials.append(material) + + # 创建统一请求对象 + report_request = WorkstationReportRequest( + token=request_data['token'], + request_time=request_data['request_time'], + data=data + ) + + # 调用工作站处理方法 + result = self.workstation.process_order_finish_report(report_request, used_materials) + + status_names = {"30": "完成", "-11": "异常停止", "-12": "人工停止"} + status_desc = status_names.get(str(data['status']), f"状态{data['status']}") + + return HttpResponse( + success=True, + message=f"任务完成报送已处理: {data['orderName']} ({data['orderCode']}) - {status_desc}", + acknowledgment_id=f"ORDER_{int(time.time() * 1000)}_{data['orderCode']}", + data=result + ) + + except Exception as e: + logger.error(f"处理任务完成报送失败: {e}") + return HttpResponse( + success=False, + message=f"任务完成报送处理失败: {str(e)}" + ) + + def _handle_batch_update_report(self, request_data: Dict[str, Any]) -> HttpResponse: + """处理批量报送""" + try: + step_updates = request_data.get('step_updates', []) + sample_updates = request_data.get('sample_updates', []) + order_updates = request_data.get('order_updates', []) + + results = { + 'step_results': [], + 'sample_results': [], + 'order_results': [], + 'total_processed': 0, + 'total_failed': 0 + } + + # 处理批量步骤更新 + for step_data in step_updates: + try: + step_data['token'] = request_data.get('token', step_data.get('token')) + step_data['request_time'] = request_data.get('request_time', step_data.get('request_time')) + result = self._handle_step_finish_report(step_data) + results['step_results'].append(result) + if result.success: + results['total_processed'] += 1 + else: + results['total_failed'] += 1 + except Exception as e: + results['step_results'].append(HttpResponse(success=False, message=str(e))) + results['total_failed'] += 1 + + # 处理批量通量更新 + for sample_data in sample_updates: + try: + sample_data['token'] = request_data.get('token', sample_data.get('token')) + sample_data['request_time'] = request_data.get('request_time', sample_data.get('request_time')) + result = self._handle_sample_finish_report(sample_data) + results['sample_results'].append(result) + if result.success: + results['total_processed'] += 1 + else: + results['total_failed'] += 1 + except Exception as e: + results['sample_results'].append(HttpResponse(success=False, message=str(e))) + results['total_failed'] += 1 + + # 处理批量任务更新 + for order_data in order_updates: + try: + order_data['token'] = request_data.get('token', order_data.get('token')) + order_data['request_time'] = request_data.get('request_time', order_data.get('request_time')) + result = self._handle_order_finish_report(order_data) + results['order_results'].append(result) + if result.success: + results['total_processed'] += 1 + else: + results['total_failed'] += 1 + except Exception as e: + results['order_results'].append(HttpResponse(success=False, message=str(e))) + results['total_failed'] += 1 + + return HttpResponse( + success=results['total_failed'] == 0, + message=f"批量报送处理完成: {results['total_processed']} 成功, {results['total_failed']} 失败", + acknowledgment_id=f"BATCH_{int(time.time() * 1000)}", + data=results + ) + + except Exception as e: + logger.error(f"处理批量报送失败: {e}") + return HttpResponse( + success=False, + message=f"批量报送处理失败: {str(e)}" + ) + + def _handle_material_change_report(self, request_data: Dict[str, Any]) -> HttpResponse: + """处理物料变更报送""" + try: + # 验证必需字段 + required_fields = ['workstation_id', 'timestamp', 'resource_id', 'change_type'] + if missing_fields := [field for field in required_fields if field not in request_data]: + return HttpResponse( + success=False, + message=f"缺少必要字段: {', '.join(missing_fields)}" + ) + + # 调用工作站的处理方法 + result = self.workstation.process_material_change_report(request_data) + + return HttpResponse( + success=True, + message=f"物料变更报送已处理: {request_data['resource_id']} ({request_data['change_type']})", + acknowledgment_id=f"MATERIAL_{int(time.time() * 1000)}_{request_data['resource_id']}", + data=result + ) + + except Exception as e: + logger.error(f"处理物料变更报送失败: {e}") + return HttpResponse( + success=False, + message=f"物料变更报送处理失败: {str(e)}" + ) + + def _handle_error_handling_report(self, request_data: Dict[str, Any]) -> HttpResponse: + """处理错误处理报送""" + try: + # 验证必需字段 + required_fields = ['workstation_id', 'timestamp', 'error_type', 'error_message'] + if missing_fields := [field for field in required_fields if field not in request_data]: + return HttpResponse( + success=False, + message=f"缺少必要字段: {', '.join(missing_fields)}" + ) + + # 调用工作站的处理方法 + result = self.workstation.handle_external_error(request_data) + + return HttpResponse( + success=True, + message=f"错误处理报送已处理: {request_data['error_type']} - {request_data['error_message']}", + acknowledgment_id=f"ERROR_{int(time.time() * 1000)}_{request_data.get('action_id', 'unknown')}", + data=result + ) + + except Exception as e: + logger.error(f"处理错误处理报送失败: {e}") + return HttpResponse( + success=False, + message=f"错误处理报送处理失败: {str(e)}" + ) + + def _handle_status_check(self) -> HttpResponse: + """处理状态查询""" + try: + return HttpResponse( + success=True, + message="工作站报送服务正常运行", + data={ + "workstation_id": self.workstation.device_id, + "service_type": "unified_reporting_service", + "uptime": time.time() - getattr(self.workstation, '_start_time', time.time()), + "reports_received": getattr(self.workstation, '_reports_received_count', 0), + "supported_endpoints": [ + "POST /report/step_finish", + "POST /report/sample_finish", + "POST /report/order_finish", + "POST /report/batch_update", + "POST /report/material_change", + "POST /report/error_handling", + "GET /status", + "GET /health" + ] + } + ) + except Exception as e: + logger.error(f"处理状态查询失败: {e}") + return HttpResponse( + success=False, + message=f"状态查询失败: {str(e)}" + ) + + def _send_response(self, response: HttpResponse): + """发送响应""" + try: + # 设置响应状态码 + status_code = 200 if response.success else 400 + self.send_response(status_code) + + # 设置响应头 + self.send_header('Content-Type', 'application/json; charset=utf-8') + self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') + self.send_header('Access-Control-Allow-Headers', 'Content-Type') + self.end_headers() + + # 发送响应体 + response_json = json.dumps(asdict(response), ensure_ascii=False, indent=2) + self.wfile.write(response_json.encode('utf-8')) + + except Exception as e: + logger.error(f"发送响应失败: {e}") + + def log_message(self, format, *args): + """重写日志方法""" + logger.debug(f"HTTP请求: {format % args}") + + +class WorkstationHTTPService: + """工作站HTTP服务""" + + def __init__(self, workstation_instance, host: str = "127.0.0.1", port: int = 8080): + self.workstation = workstation_instance + self.host = host + self.port = port + self.server = None + self.server_thread = None + self.running = False + + # 初始化统计信息 + self.workstation._start_time = time.time() + self.workstation._reports_received_count = 0 + + def start(self): + """启动HTTP服务""" + try: + # 创建处理器工厂函数 + def handler_factory(*args, **kwargs): + return WorkstationHTTPHandler(self.workstation, *args, **kwargs) + + # 创建HTTP服务器 + self.server = HTTPServer((self.host, self.port), handler_factory) + + # 在单独线程中运行服务器 + self.server_thread = threading.Thread( + target=self._run_server, + daemon=True, + name=f"WorkstationHTTP-{self.workstation.device_id}" + ) + + self.running = True + self.server_thread.start() + + logger.info(f"工作站HTTP报送服务已启动: http://{self.host}:{self.port}") + logger.info("统一的报送端点 (基于LIMS协议规范):") + logger.info(" - POST /report/step_finish # 步骤完成报送") + logger.info(" - POST /report/sample_finish # 通量完成报送") + logger.info(" - POST /report/order_finish # 任务完成报送") + logger.info(" - POST /report/batch_update # 批量更新报送") + logger.info("扩展报送端点:") + logger.info(" - POST /report/material_change # 物料变更报送") + logger.info(" - POST /report/error_handling # 错误处理报送") + logger.info("兼容端点:") + logger.info(" - POST /LIMS/step_finish # 兼容LIMS步骤完成") + logger.info(" - POST /LIMS/preintake_finish # 兼容LIMS通量完成") + logger.info(" - POST /LIMS/order_finish # 兼容LIMS任务完成") + logger.info("服务端点:") + logger.info(" - GET /status # 服务状态查询") + logger.info(" - GET /health # 健康检查") + + except Exception as e: + logger.error(f"启动HTTP服务失败: {e}") + raise + + def stop(self): + """停止HTTP服务""" + try: + if self.running and self.server: + self.running = False + self.server.shutdown() + self.server.server_close() + + if self.server_thread and self.server_thread.is_alive(): + self.server_thread.join(timeout=5.0) + + logger.info("工作站HTTP报送服务已停止") + + except Exception as e: + logger.error(f"停止HTTP服务失败: {e}") + + def _run_server(self): + """运行HTTP服务器""" + try: + while self.running: + self.server.handle_request() + except Exception as e: + if self.running: # 只在非正常停止时记录错误 + logger.error(f"HTTP服务运行错误: {e}") + + @property + def is_running(self) -> bool: + """检查服务是否正在运行""" + return self.running and self.server_thread and self.server_thread.is_alive() + + @property + def service_url(self) -> str: + """获取服务URL""" + return f"http://{self.host}:{self.port}" + + +# 导出主要类 - 保持向后兼容 +@dataclass +class MaterialChangeReport: + """已废弃:物料变更报送,请使用统一的WorkstationReportRequest""" + pass + + +@dataclass +class TaskExecutionReport: + """已废弃:任务执行报送,请使用统一的WorkstationReportRequest""" + pass + + +# 导出列表 +__all__ = [ + 'WorkstationReportRequest', + 'MaterialUsage', + 'HttpResponse', + 'WorkstationHTTPService', + # 向后兼容 + 'MaterialChangeReport', + 'TaskExecutionReport' +] diff --git a/unilabos/ros/nodes/presets/protocol_node.py b/unilabos/ros/nodes/presets/protocol_node.py index 23462142..c0ed6849 100644 --- a/unilabos/ros/nodes/presets/protocol_node.py +++ b/unilabos/ros/nodes/presets/protocol_node.py @@ -43,6 +43,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): children: dict, protocol_type: Union[str, list[str]], resource_tracker: DeviceNodeResourceTracker, + workstation_config: dict = None, # 新增:工作站配置 *args, **kwargs, ): @@ -50,6 +51,8 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): # 初始化其它属性 self.children = children + self.workstation_config = workstation_config or {} # 新增:保存工作站配置 + self.communication_interfaces = self.workstation_config.get('communication_interfaces', {}) # 从工作站配置获取通信接口 self._busy = False self.sub_devices = {} self._goals = {} @@ -69,57 +72,135 @@ class ROS2ProtocolNode(BaseROS2DeviceNode): # 初始化子设备 self.communication_node_id_to_instance = {} + self._initialize_child_devices() + + # 设置硬件接口代理 + self._setup_hardware_proxies() + self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") + + def _initialize_child_devices(self): + """初始化子设备 - 重构为更清晰的方法""" + # 设备分类字典 - 统一管理 + self.communication_devices = {} + self.logical_devices = {} + for device_id, device_config in self.children.items(): if device_config.get("type", "device") != "device": self.lab_logger().debug( f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." ) continue + try: d = self.initialize_device(device_id, device_config) + if d is None: + continue + + # 统一的设备分类逻辑 + device_type = device_config.get("device_type", "logical") + + # 兼容旧的ID匹配方式和新的配置方式 + if device_type == "communication" or "serial_" in device_id or "io_" in device_id: + self.communication_node_id_to_instance[device_id] = d # 保持向后兼容 + self.communication_devices[device_id] = d # 新的统一方式 + self.lab_logger().info(f"通信设备 {device_id} 初始化并分类成功") + elif device_type == "logical": + self.logical_devices[device_id] = d + self.lab_logger().info(f"逻辑设备 {device_id} 初始化并分类成功") + else: + # 默认作为逻辑设备处理 + self.logical_devices[device_id] = d + self.lab_logger().info(f"设备 {device_id} 作为逻辑设备处理") + except Exception as ex: self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}") - d = None - if d is None: - continue - - if "serial_" in device_id or "io_" in device_id: - self.communication_node_id_to_instance[device_id] = d - continue + def _setup_hardware_proxies(self): + """设置硬件接口代理 - 重构为独立方法,支持工作站配置""" + # 1. 传统的协议节点硬件代理设置 for device_id, device_config in self.children.items(): if device_config.get("type", "device") != "device": continue + # 设置硬件接口代理 if device_id not in self.sub_devices: self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...") continue + d = self.sub_devices[device_id] if d: - hardware_interface = d.ros_node_instance._hardware_interface - if ( - hasattr(d.driver_instance, hardware_interface["name"]) - and hasattr(d.driver_instance, hardware_interface["write"]) - and (hardware_interface["read"] is None or hasattr(d.driver_instance, hardware_interface["read"])) - ): + self._setup_device_hardware_proxy(device_id, d) + + # 2. 工作站配置的通信接口代理设置 + if hasattr(self, 'communication_interfaces') and self.communication_interfaces: + self._setup_workstation_communication_interfaces() - name = getattr(d.driver_instance, hardware_interface["name"]) - read = hardware_interface.get("read", None) - write = hardware_interface.get("write", None) + self.lab_logger().info(f"ROS2ProtocolNode {self.device_id} initialized with protocols: {self.protocol_names}") - # 如果硬件接口是字符串,通过通信设备提供 - if isinstance(name, str) and name in self.sub_devices: - communicate_device = self.sub_devices[name] - communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface - self._setup_hardware_proxy(d, self.sub_devices[name], read, write) - self.lab_logger().info( - f"\n通信代理:为子设备{device_id}\n " - f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n " - f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})" - ) + def _setup_workstation_communication_interfaces(self): + """设置工作站特定的通信接口代理""" + for logical_device_id, logical_device in self.logical_devices.items(): + # 检查是否有配置的通信接口 + interface_config = getattr(self, 'communication_interfaces', {}).get(logical_device_id) + if not interface_config: + continue + + comm_device = self.communication_devices.get(interface_config.device_id) + if not comm_device: + self.lab_logger().error(f"通信设备 {interface_config.device_id} 不存在") + continue + + # 设置工作站级别的通信代理 + self._setup_workstation_hardware_proxy( + logical_device, + comm_device, + interface_config + ) - self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") + def _setup_workstation_hardware_proxy(self, logical_device, comm_device, interface_config): + """为逻辑设备设置工作站级通信代理""" + try: + # 获取通信设备的读写方法 + read_func = getattr(comm_device.driver_instance, interface_config.read_method, None) + write_func = getattr(comm_device.driver_instance, interface_config.write_method, None) + + if read_func: + setattr(logical_device.driver_instance, 'comm_read', read_func) + if write_func: + setattr(logical_device.driver_instance, 'comm_write', write_func) + + # 设置通信配置 + setattr(logical_device.driver_instance, 'comm_config', interface_config.config) + setattr(logical_device.driver_instance, 'comm_protocol', interface_config.protocol_type) + + self.lab_logger().info(f"为逻辑设备 {logical_device.device_id} 设置工作站通信代理 -> {comm_device.device_id}") + + except Exception as e: + self.lab_logger().error(f"设置工作站通信代理失败: {e}") + + def _setup_device_hardware_proxy(self, device_id: str, device): + """为单个设备设置硬件接口代理""" + hardware_interface = device.ros_node_instance._hardware_interface + if ( + hasattr(device.driver_instance, hardware_interface["name"]) + and hasattr(device.driver_instance, hardware_interface["write"]) + and (hardware_interface["read"] is None or hasattr(device.driver_instance, hardware_interface["read"])) + ): + name = getattr(device.driver_instance, hardware_interface["name"]) + read = hardware_interface.get("read", None) + write = hardware_interface.get("write", None) + + # 如果硬件接口是字符串,通过通信设备提供 + if isinstance(name, str) and name in self.sub_devices: + communicate_device = self.sub_devices[name] + communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface + self._setup_hardware_proxy(device, self.sub_devices[name], read, write) + self.lab_logger().info( + f"\n通信代理:为子设备{device_id}\n " + f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n " + f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})" + ) def _setup_protocol_names(self, protocol_type): # 处理协议类型