update workstation base

This commit is contained in:
Junhan Chang
2025-08-21 10:05:58 +08:00
parent 227ff1284a
commit 9f823a4198
3 changed files with 1508 additions and 90 deletions

View File

@@ -25,6 +25,9 @@ from unilabos.ros.nodes.presets.protocol_node import ROS2ProtocolNode
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
from unilabos.device_comms.workstation_communication import WorkstationCommunicationBase, CommunicationConfig from unilabos.device_comms.workstation_communication import WorkstationCommunicationBase, CommunicationConfig
from unilabos.device_comms.workstation_material_management import MaterialManagementBase from unilabos.device_comms.workstation_material_management import MaterialManagementBase
from unilabos.device_comms.workstation_http_service import (
WorkstationHTTPService, WorkstationReportRequest, MaterialUsage
)
from unilabos.ros.msgs.message_converter import convert_to_ros_msg, convert_from_ros_msg from unilabos.ros.msgs.message_converter import convert_to_ros_msg, convert_from_ros_msg
from unilabos.utils.log import logger from unilabos.utils.log import logger
from unilabos.utils.type_check import serialize_result_info from unilabos.utils.type_check import serialize_result_info
@@ -114,6 +117,7 @@ class WorkstationBase(ROS2ProtocolNode, ABC):
communication_config: CommunicationConfig, communication_config: CommunicationConfig,
deck_config: Optional[Dict[str, Any]] = None, deck_config: Optional[Dict[str, Any]] = None,
communication_interfaces: Optional[Dict[str, CommunicationInterface]] = None, communication_interfaces: Optional[Dict[str, CommunicationInterface]] = None,
http_service_config: Optional[Dict[str, Any]] = None, # 新增HTTP服务配置
*args, *args,
**kwargs, **kwargs,
): ):
@@ -122,6 +126,18 @@ class WorkstationBase(ROS2ProtocolNode, ABC):
self.deck_config = deck_config or {"size_x": 1000.0, "size_y": 1000.0, "size_z": 500.0} self.deck_config = deck_config or {"size_x": 1000.0, "size_y": 1000.0, "size_z": 500.0}
self.communication_interfaces = communication_interfaces or {} self.communication_interfaces = communication_interfaces or {}
# HTTP服务配置 - 现在专门用于报送接收
self.http_service_config = http_service_config or {
"enabled": True,
"host": "127.0.0.1",
"port": 8081 # 默认使用8081端口作为报送接收服务
}
# 错误处理和动作追踪
self.current_action_context = None # 当前正在执行的动作上下文
self.error_history = [] # 错误历史记录
self.action_results = {} # 动作结果缓存
# 工作流状态 - 支持静态和动态工作流 # 工作流状态 - 支持静态和动态工作流
self.current_workflow_status = WorkflowStatus.IDLE self.current_workflow_status = WorkflowStatus.IDLE
self.current_workflow_info = None self.current_workflow_info = None
@@ -135,28 +151,27 @@ class WorkstationBase(ROS2ProtocolNode, ABC):
self.registered_workflows: Dict[str, WorkflowDefinition] = {} self.registered_workflows: Dict[str, WorkflowDefinition] = {}
self._workflow_action_servers: Dict[str, ActionServer] = {} self._workflow_action_servers: Dict[str, ActionServer] = {}
# 初始化基类 - ROS2ProtocolNode会处理子设备初始化 # 初始化基类 - ROS2ProtocolNode会处理所有设备管理
super().__init__( super().__init__(
device_id=device_id, device_id=device_id,
children=children, children=children,
protocol_type=protocol_type, protocol_type=protocol_type,
resource_tracker=resource_tracker, resource_tracker=resource_tracker,
workstation_config={
'communication_interfaces': communication_interfaces,
'deck_config': self.deck_config
},
*args, *args,
**kwargs **kwargs
) )
# 工作站特有的设备分类 (基于已初始化的sub_devices) # 使用父类的设备分类结果(不再重复分类)
self.communication_devices: Dict[str, Any] = {} # self.communication_devices 和 self.logical_devices 由 ROS2ProtocolNode 提供
self.logical_devices: Dict[str, Any] = {}
self._classify_devices()
# 初始化工作站模块 # 初始化工作站模块
self.communication: WorkstationCommunicationBase = self._create_communication_module() self.communication: WorkstationCommunicationBase = self._create_communication_module()
self.material_management: MaterialManagementBase = self._create_material_management_module() self.material_management: MaterialManagementBase = self._create_material_management_module()
# 设置工作站特定的通信接口
self._setup_workstation_communication_interfaces()
# 注册支持的工作流 # 注册支持的工作流
self._register_supported_workflows() self._register_supported_workflows()
@@ -166,62 +181,12 @@ class WorkstationBase(ROS2ProtocolNode, ABC):
# 启动状态监控 # 启动状态监控
self._start_status_monitoring() self._start_status_monitoring()
# 启动HTTP报送接收服务
self.http_service = None
self._start_http_service()
logger.info(f"增强工作站基类 {device_id} 初始化完成") logger.info(f"增强工作站基类 {device_id} 初始化完成")
def _classify_devices(self):
"""基于已初始化的设备进行分类"""
for device_id, device in self.sub_devices.items():
device_config = self.children.get(device_id, {})
device_type = DeviceType(device_config.get("device_type", "logical"))
if device_type == DeviceType.COMMUNICATION:
self.communication_devices[device_id] = device
logger.info(f"通信设备 {device_id} 已分类")
elif device_type == DeviceType.LOGICAL:
self.logical_devices[device_id] = device
logger.info(f"逻辑设备 {device_id} 已分类")
def _setup_workstation_communication_interfaces(self):
"""设置工作站特定的通信接口代理"""
for logical_device_id, logical_device in self.logical_devices.items():
# 检查是否有配置的通信接口
interface_config = self.communication_interfaces.get(logical_device_id)
if not interface_config:
continue
comm_device = self.communication_devices.get(interface_config.device_id)
if not comm_device:
logger.error(f"通信设备 {interface_config.device_id} 不存在")
continue
# 设置工作站级别的通信代理
self._setup_workstation_hardware_proxy(
logical_device,
comm_device,
interface_config
)
def _setup_workstation_hardware_proxy(self, logical_device, comm_device, interface: CommunicationInterface):
"""为逻辑设备设置工作站级通信代理"""
try:
# 获取通信设备的读写方法
read_func = getattr(comm_device.driver_instance, interface.read_method, None)
write_func = getattr(comm_device.driver_instance, interface.write_method, None)
if read_func:
setattr(logical_device.driver_instance, 'comm_read', read_func)
if write_func:
setattr(logical_device.driver_instance, 'comm_write', write_func)
# 设置通信配置
setattr(logical_device.driver_instance, 'comm_config', interface.config)
setattr(logical_device.driver_instance, 'comm_protocol', interface.protocol_type)
logger.info(f"为逻辑设备 {logical_device.device_id} 设置工作站通信代理 -> {comm_device.device_id}")
except Exception as e:
logger.error(f"设置工作站通信代理失败: {e}")
@abstractmethod @abstractmethod
def _create_communication_module(self) -> WorkstationCommunicationBase: def _create_communication_module(self) -> WorkstationCommunicationBase:
"""创建通信模块 - 子类必须实现""" """创建通信模块 - 子类必须实现"""
@@ -355,6 +320,579 @@ class WorkstationBase(ROS2ProtocolNode, ABC):
# 目前简化为按需查询 # 目前简化为按需查询
pass pass
def _start_http_service(self):
"""启动HTTP报送接收服务"""
try:
if not self.http_service_config.get("enabled", True):
logger.info("HTTP报送接收服务已禁用")
return
host = self.http_service_config.get("host", "127.0.0.1")
port = self.http_service_config.get("port", 8081)
self.http_service = WorkstationHTTPService(
workstation_instance=self,
host=host,
port=port
)
self.http_service.start()
logger.info(f"工作站 {self.device_id} HTTP报送接收服务启动成功: {self.http_service.service_url}")
except Exception as e:
logger.error(f"启动HTTP报送接收服务失败: {e}")
self.http_service = None
def _stop_http_service(self):
"""停止HTTP报送接收服务"""
try:
if self.http_service:
self.http_service.stop()
self.http_service = None
logger.info("HTTP报送接收服务已停止")
except Exception as e:
logger.error(f"停止HTTP报送接收服务失败: {e}")
# ============ 报送处理方法 ============
def process_material_change_report(self, report) -> Dict[str, Any]:
"""处理物料变更报送 - 同步到 ResourceTracker 并发送 ROS2 更新"""
try:
logger.info(f"处理物料变更报送: {report.workstation_id} -> {report.resource_id} ({report.change_type})")
# 增加接收计数
self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1
# 准备变更数据
changes = {
'workstation_id': report.workstation_id,
'timestamp': report.timestamp,
'change_type': report.change_type,
'resource_id': report.resource_id
}
# 添加额外的变更信息
if hasattr(report, 'new_location'):
changes['location'] = {
'x': getattr(report.new_location, 'x', 0),
'y': getattr(report.new_location, 'y', 0),
'z': getattr(report.new_location, 'z', 0)
}
if hasattr(report, 'quantity'):
changes['quantity'] = report.quantity
if hasattr(report, 'status'):
changes['status'] = report.status
# 同步到 ResourceTracker
sync_success = self.resource_tracker.update_material_state(
report.resource_id,
changes,
report.change_type
)
result = {
'processed': True,
'resource_id': report.resource_id,
'change_type': report.change_type,
'next_actions': [],
'tracker_sync': sync_success
}
# 发送 ROS2 ResourceUpdate 请求到 host node
if sync_success:
try:
self._send_resource_update_to_host(report.resource_id, changes)
result['ros_update_sent'] = True
result['next_actions'].append('ros_update_completed')
except Exception as e:
logger.warning(f"发送ROS2资源更新失败: {e}")
result['ros_update_sent'] = False
result['warnings'] = [f"ROS2更新失败: {str(e)}"]
# 根据变更类型处理
if report.change_type == 'created':
result['next_actions'].append('sync_to_global_registry')
self._handle_material_created(report)
elif report.change_type == 'updated':
result['next_actions'].append('update_local_state')
self._handle_material_updated(report)
elif report.change_type == 'moved':
result['next_actions'].append('update_location_tracking')
self._handle_material_moved(report)
elif report.change_type == 'consumed':
result['next_actions'].append('update_inventory')
self._handle_material_consumed(report)
elif report.change_type == 'completed':
result['next_actions'].append('trigger_next_workflow')
self._handle_material_completed(report)
# 更新本地物料管理系统(如果存在)
if hasattr(self, 'material_management'):
try:
self.material_management.sync_external_material_change(report)
except Exception as e:
logger.warning(f"同步物料变更到本地管理系统失败: {e}")
return result
except Exception as e:
logger.error(f"处理物料变更报送失败: {e}")
return {
'processed': False,
'error': str(e),
'next_actions': ['retry_processing']
}
def process_workflow_status_report(self, workstation_id: str, workflow_id: str,
status: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""处理工作流状态报送"""
try:
logger.info(f"处理工作流状态报送: {workstation_id} -> {workflow_id} ({status})")
# 增加接收计数
self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1
result = {
'processed': True,
'workflow_id': workflow_id,
'status': status
}
# 这里可以添加工作流状态同步逻辑
# 例如:更新本地工作流状态、触发后续动作等
return result
except Exception as e:
logger.error(f"处理工作流状态报送失败: {e}")
return {'processed': False, 'error': str(e)}
# ============ 统一报送处理方法基于LIMS协议规范 ============
def process_step_finish_report(self, request: WorkstationReportRequest) -> Dict[str, Any]:
"""处理步骤完成报送统一LIMS协议规范- 同步到 ResourceTracker"""
try:
data = request.data
logger.info(f"处理步骤完成报送: {data['orderCode']} - {data['stepName']} (步骤ID: {data['stepId']})")
# 增加接收计数
self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1
# 同步步骤信息到 ResourceTracker
step_changes = {
'order_code': data['orderCode'],
'order_name': data.get('orderName', ''),
'step_name': data['stepName'],
'step_id': data['stepId'],
'sample_id': data['sampleId'],
'start_time': data['startTime'],
'end_time': data['endTime'],
'execution_status': data.get('executionStatus', 'completed'),
'status': 'step_completed',
'last_updated': request.request_time
}
# 更新 ResourceTracker 中的样本状态
sample_sync_success = False
if data['sampleId']:
sample_sync_success = self.resource_tracker.update_material_state(
data['sampleId'],
{
'current_step': data['stepName'],
'step_status': 'completed',
'last_step_time': data['endTime'],
'execution_status': data.get('executionStatus', 'completed')
},
'step_finished'
)
result = {
'processed': True,
'order_code': data['orderCode'],
'step_id': data['stepId'],
'step_name': data['stepName'],
'sample_id': data['sampleId'],
'start_time': data['startTime'],
'end_time': data['endTime'],
'execution_status': data.get('executionStatus', 'completed'),
'next_actions': [],
'sample_sync': sample_sync_success
}
# 发送 ROS2 ResourceUpdate 到 host node
if sample_sync_success and data['sampleId']:
try:
self._send_resource_update_to_host(data['sampleId'], step_changes)
result['ros_update_sent'] = True
result['next_actions'].append('ros_step_update_completed')
except Exception as e:
logger.warning(f"发送ROS2步骤完成更新失败: {e}")
result['ros_update_sent'] = False
result['warnings'] = [f"ROS2更新失败: {str(e)}"]
# 处理步骤完成逻辑
try:
# 更新步骤状态
result['next_actions'].append('update_step_status')
# 检查是否触发后续步骤
result['next_actions'].append('check_next_step')
# 更新通量进度
result['next_actions'].append('update_sample_progress')
# 记录步骤完成事件
self._record_step_completion(data)
except Exception as e:
logger.warning(f"步骤完成处理过程中出现警告: {e}")
result['warnings'] = result.get('warnings', []) + [str(e)]
return result
except Exception as e:
logger.error(f"处理步骤完成报送失败: {e}")
return {
'processed': False,
'error': str(e),
'next_actions': ['retry_processing']
}
def process_sample_finish_report(self, request: WorkstationReportRequest) -> Dict[str, Any]:
"""处理通量完成报送统一LIMS协议规范"""
try:
data = request.data
logger.info(f"处理通量完成报送: {data['orderCode']} - 通量ID: {data['sampleId']} (状态: {data['Status']})")
# 增加接收计数
self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1
result = {
'processed': True,
'order_code': data['orderCode'],
'sample_id': data['sampleId'],
'status': data['Status'],
'start_time': data['startTime'],
'end_time': data['endTime'],
'next_actions': []
}
# 根据通量状态处理
status = int(data['Status'])
if status == 20: # 完成
result['next_actions'].extend(['update_sample_completed', 'check_order_completion'])
self._record_sample_completion(data, 'completed')
elif status == -2: # 异常停止
result['next_actions'].extend(['log_sample_error', 'trigger_error_handling'])
self._record_sample_completion(data, 'error')
elif status == -3: # 人工停止或取消
result['next_actions'].extend(['log_sample_cancelled', 'update_order_status'])
self._record_sample_completion(data, 'cancelled')
elif status == 10: # 开始
result['next_actions'].append('update_sample_started')
self._record_sample_start(data)
elif status == 2: # 进样
result['next_actions'].append('update_sample_intake')
self._record_sample_intake(data)
return result
except Exception as e:
logger.error(f"处理通量完成报送失败: {e}")
return {
'processed': False,
'error': str(e),
'next_actions': ['retry_processing']
}
def process_order_finish_report(self, request: WorkstationReportRequest, used_materials: List[MaterialUsage]) -> Dict[str, Any]:
"""处理任务完成报送统一LIMS协议规范"""
try:
data = request.data
logger.info(f"处理任务完成报送: {data['orderCode']} - {data['orderName']} (状态: {data['status']})")
# 增加接收计数
self._reports_received_count = getattr(self, '_reports_received_count', 0) + 1
result = {
'processed': True,
'order_code': data['orderCode'],
'order_name': data['orderName'],
'status': data['status'],
'start_time': data['startTime'],
'end_time': data['endTime'],
'used_materials_count': len(used_materials),
'next_actions': []
}
# 根据任务状态处理
status = int(data['status'])
if status == 30: # 完成
result['next_actions'].extend([
'update_order_completed',
'process_material_usage',
'generate_completion_report'
])
self._record_order_completion(data, used_materials, 'completed')
elif status == -11: # 异常停止
result['next_actions'].extend([
'log_order_error',
'trigger_error_handling',
'process_partial_material_usage'
])
self._record_order_completion(data, used_materials, 'error')
elif status == -12: # 人工停止或取消
result['next_actions'].extend([
'log_order_cancelled',
'revert_material_reservations'
])
self._record_order_completion(data, used_materials, 'cancelled')
# 处理物料使用记录
if used_materials:
material_usage_result = self._process_material_usage(used_materials)
result['material_usage'] = material_usage_result
return result
except Exception as e:
logger.error(f"处理任务完成报送失败: {e}")
return {
'processed': False,
'error': str(e),
'next_actions': ['retry_processing']
}
# ============ 具体的报送处理方法 ============
def _handle_material_created(self, report):
"""处理物料创建报送"""
try:
# 已废弃的方法,保留用于兼容性
logger.debug(f"处理物料创建: {getattr(report, 'resource_id', 'unknown')}")
except Exception as e:
logger.error(f"处理物料创建失败: {e}")
def _handle_material_updated(self, report):
"""处理物料更新报送"""
try:
logger.debug(f"处理物料更新: {getattr(report, 'resource_id', 'unknown')}")
except Exception as e:
logger.error(f"处理物料更新失败: {e}")
def _handle_material_moved(self, report):
"""处理物料移动报送"""
try:
logger.debug(f"处理物料移动: {getattr(report, 'resource_id', 'unknown')}")
except Exception as e:
logger.error(f"处理物料移动失败: {e}")
def _handle_material_consumed(self, report):
"""处理物料消耗报送"""
try:
logger.debug(f"处理物料消耗: {getattr(report, 'resource_id', 'unknown')}")
except Exception as e:
logger.error(f"处理物料消耗失败: {e}")
def _handle_material_completed(self, report):
"""处理物料完成报送"""
try:
logger.debug(f"处理物料完成: {getattr(report, 'resource_id', 'unknown')}")
except Exception as e:
logger.error(f"处理物料完成失败: {e}")
# ============ 工作流控制接口 ============
def handle_external_error(self, error_request):
"""处理外部错误请求"""
try:
logger.error(f"收到外部错误处理请求: {getattr(error_request, 'error_type', 'unknown')}")
return {
'success': True,
'message': "错误已记录",
'error_code': 'OK'
}
except Exception as e:
logger.error(f"处理外部错误失败: {e}")
return {
'success': False,
'message': f"错误处理失败: {str(e)}",
'error_code': 'ERROR_HANDLING_FAILED'
}
def _process_error_handling(self, error_request, error_record):
"""处理具体的错误类型"""
return {'success': True, 'actions_taken': ['已转换为统一报送']}
"""处理具体的错误类型"""
try:
result = {'success': True, 'actions_taken': []}
# 1. 如果有特定动作ID标记该动作失败
if error_request.action_id:
self._mark_action_failed(error_request.action_id, error_request.error_message)
result['actions_taken'].append(f"标记动作 {error_request.action_id} 为失败")
# 2. 如果有工作流ID停止相关工作流
if error_request.workflow_id:
self._handle_workflow_error(error_request.workflow_id, error_request.error_message)
result['actions_taken'].append(f"处理工作流 {error_request.workflow_id} 错误")
# 3. 根据错误类型执行特定处理
error_type = error_request.error_type.lower()
if error_type in ['material_error', 'resource_error']:
# 物料相关错误
material_result = self._handle_material_error(error_request)
result['actions_taken'].extend(material_result.get('actions', []))
elif error_type in ['device_error', 'communication_error']:
# 设备通信错误
device_result = self._handle_device_error(error_request)
result['actions_taken'].extend(device_result.get('actions', []))
elif error_type in ['workflow_error', 'process_error']:
# 工作流程错误
workflow_result = self._handle_process_error(error_request)
result['actions_taken'].extend(workflow_result.get('actions', []))
else:
# 通用错误处理
result['actions_taken'].append("执行通用错误处理")
# 4. 如果是严重错误,触发紧急停止
if error_request.error_type.lower() in ['critical_error', 'safety_error', 'emergency']:
self._trigger_emergency_stop(error_request.error_message)
result['actions_taken'].append("触发紧急停止")
result['message'] = "错误处理完成"
return result
except Exception as e:
logger.error(f"错误处理过程失败: {e}")
return {
'success': False,
'message': f"错误处理过程失败: {str(e)}",
'error_code': 'ERROR_PROCESSING_FAILED'
}
def _mark_action_failed(self, action_id: str, error_message: str):
"""标记指定动作为失败"""
try:
# 创建失败结果
failed_result = {
'success': False,
'error': True,
'error_message': error_message,
'timestamp': time.time(),
'marked_by_external_error': True
}
# 存储到动作结果缓存
self.action_results[action_id] = failed_result
# 如果当前有正在执行的动作,更新其状态
if self.current_action_context and self.current_action_context.get('action_id') == action_id:
self.current_action_context['failed'] = True
self.current_action_context['error_message'] = error_message
logger.info(f"动作 {action_id} 已标记为失败: {error_message}")
except Exception as e:
logger.error(f"标记动作失败时出错: {e}")
def _handle_workflow_error(self, workflow_id: str, error_message: str):
"""处理工作流错误"""
try:
# 如果是当前正在运行的工作流
if (self.current_workflow_info and
self.current_workflow_info.get('id') == workflow_id):
# 停止当前工作流
self.stop_workflow(emergency=True)
logger.info(f"因外部错误停止工作流 {workflow_id}: {error_message}")
except Exception as e:
logger.error(f"处理工作流错误失败: {e}")
def _handle_material_error(self, error_request):
"""处理物料相关错误(已废弃,请使用统一报送接口)"""
return {'success': True, 'message': '物料错误已记录'}
"""处理物料相关错误"""
actions = []
try:
# 可以触发物料重新扫描、位置重置等
if error_request.context and 'resource_id' in error_request.context:
resource_id = error_request.context['resource_id']
# 触发物料状态更新
actions.append(f"更新物料 {resource_id} 状态")
actions.append("执行物料错误恢复流程")
except Exception as e:
logger.error(f"处理物料错误失败: {e}")
return {'actions': actions}
def _handle_device_error(self, error_request):
"""处理设备相关错误(已废弃,请使用统一报送接口)"""
return {'success': True, 'message': '设备错误已记录'}
"""处理设备错误"""
actions = []
try:
if error_request.device_id:
# 重置设备连接
actions.append(f"重置设备 {error_request.device_id} 连接")
# 如果是通信设备,重新建立连接
if error_request.device_id in self.communication_devices:
actions.append(f"重新建立通信设备 {error_request.device_id} 连接")
actions.append("执行设备错误恢复流程")
except Exception as e:
logger.error(f"处理设备错误失败: {e}")
return {'actions': actions}
def _handle_process_error(self, error_request):
"""处理流程相关错误(已废弃,请使用统一报送接口)"""
return {'success': True, 'message': '流程错误已记录'}
"""处理工作流程错误"""
actions = []
try:
# 暂停当前工作流
if self.current_workflow_status not in [WorkflowStatus.IDLE, WorkflowStatus.STOPPED]:
actions.append("暂停当前工作流")
actions.append("执行工作流程错误恢复")
except Exception as e:
logger.error(f"处理工作流程错误失败: {e}")
return {'actions': actions}
def _trigger_emergency_stop(self, reason: str):
"""触发紧急停止"""
try:
logger.critical(f"触发紧急停止: {reason}")
# 停止所有工作流
self.stop_workflow(emergency=True)
# 设置错误状态
self.current_workflow_status = WorkflowStatus.ERROR
# 可以在这里添加更多紧急停止逻辑
# 例如:断开设备连接、保存当前状态等
except Exception as e:
logger.error(f"执行紧急停止失败: {e}")
# ============ 工作流控制接口 ============ # ============ 工作流控制接口 ============
def _handle_start_workflow(self, request, response): def _handle_start_workflow(self, request, response):
@@ -1298,5 +1836,199 @@ class WorkstationBase(ROS2ProtocolNode, ABC):
"active_workflows": self.active_dynamic_workflows, "active_workflows": self.active_dynamic_workflows,
"total_resources": self.workstation_resource_count, "total_resources": self.workstation_resource_count,
"communication_status": self.communication_status, "communication_status": self.communication_status,
"material_status": self.material_status "material_status": self.material_status,
"http_service_running": self.http_service.is_running if self.http_service else False
} }
# ============ 增强动作执行 - 支持错误处理和追踪 ============
async def execute_single_action(self, device_id, action_name, action_kwargs):
"""执行单个动作 - 增强版,支持错误处理和动作追踪"""
# 构建动作ID
if device_id in ["", None, "self"]:
action_id = f"/devices/{self.device_id}/{action_name}"
else:
action_id = f"/devices/{device_id}/{action_name}"
# 设置动作上下文
self.current_action_context = {
'action_id': action_id,
'device_id': device_id,
'action_name': action_name,
'action_kwargs': action_kwargs,
'start_time': time.time(),
'failed': False,
'error_message': None
}
try:
# 检查是否已被外部标记为失败
if action_id in self.action_results:
cached_result = self.action_results[action_id]
if cached_result.get('marked_by_external_error'):
logger.warning(f"动作 {action_id} 已被外部标记为失败")
return self._create_failed_result(cached_result['error_message'])
# 检查动作客户端是否存在
if action_id not in self._action_clients:
error_msg = f"找不到动作客户端: {action_id}"
self.lab_logger().error(error_msg)
return self._create_failed_result(error_msg)
# 发送动作请求
action_client = self._action_clients[action_id]
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
self.lab_logger().debug(f"发送动作请求到: {action_id}")
action_client.wait_for_server()
# 等待动作完成
request_future = action_client.send_goal_async(goal_msg)
handle = await request_future
if not handle.accepted:
error_msg = f"动作请求被拒绝: {action_name}"
self.lab_logger().error(error_msg)
return self._create_failed_result(error_msg)
# 在执行过程中检查是否被外部标记为失败
result_future = await handle.get_result_async()
# 再次检查是否在执行过程中被标记为失败
if self.current_action_context.get('failed'):
error_msg = self.current_action_context.get('error_message', '动作被外部标记为失败')
logger.warning(f"动作 {action_id} 在执行过程中被标记为失败: {error_msg}")
return self._create_failed_result(error_msg)
result = result_future.result
# 存储成功结果
self.action_results[action_id] = {
'success': True,
'result': result,
'timestamp': time.time(),
'execution_time': time.time() - self.current_action_context['start_time']
}
self.lab_logger().debug(f"动作完成: {action_name}")
return result
except Exception as e:
error_msg = f"动作执行异常: {str(e)}"
logger.error(f"执行动作 {action_id} 失败: {e}\n{traceback.format_exc()}")
return self._create_failed_result(error_msg)
finally:
# 清理动作上下文
self.current_action_context = None
def _create_failed_result(self, error_message: str):
"""创建失败结果对象"""
# 这需要根据具体的动作类型来创建相应的结果对象
# 这里返回一个通用的失败标识
class FailedResult:
def __init__(self, error_msg):
self.success = False
self.return_info = json.dumps({
"suc": False,
"error": True,
"error_message": error_msg,
"timestamp": time.time()
})
return FailedResult(error_message)
def __del__(self):
"""析构函数 - 清理HTTP服务"""
try:
self._stop_http_service()
self._stop_reporting_service()
except:
pass
# ============ LIMS辅助方法 ============
def _record_step_completion(self, step_data: Dict[str, Any]):
"""记录步骤完成事件"""
try:
logger.debug(f"记录步骤完成: {step_data['stepName']} - {step_data['stepId']}")
# 这里可以添加步骤完成的记录逻辑
# 例如:更新数据库、发送通知等
except Exception as e:
logger.error(f"记录步骤完成失败: {e}")
def _record_sample_completion(self, sample_data: Dict[str, Any], completion_type: str):
"""记录通量完成事件"""
try:
logger.debug(f"记录通量完成: {sample_data['sampleId']} - {completion_type}")
# 这里可以添加通量完成的记录逻辑
except Exception as e:
logger.error(f"记录通量完成失败: {e}")
def _record_sample_start(self, sample_data: Dict[str, Any]):
"""记录通量开始事件"""
try:
logger.debug(f"记录通量开始: {sample_data['sampleId']}")
# 这里可以添加通量开始的记录逻辑
except Exception as e:
logger.error(f"记录通量开始失败: {e}")
def _record_sample_intake(self, sample_data: Dict[str, Any]):
"""记录通量进样事件"""
try:
logger.debug(f"记录通量进样: {sample_data['sampleId']}")
# 这里可以添加通量进样的记录逻辑
except Exception as e:
logger.error(f"记录通量进样失败: {e}")
def _record_order_completion(self, order_data: Dict[str, Any], used_materials: List, completion_type: str):
"""记录任务完成事件"""
try:
logger.debug(f"记录任务完成: {order_data['orderCode']} - {completion_type}")
# 这里可以添加任务完成的记录逻辑
# 包括物料使用记录的处理
except Exception as e:
logger.error(f"记录任务完成失败: {e}")
def _process_material_usage(self, used_materials: List) -> Dict[str, Any]:
"""处理物料使用记录"""
try:
logger.debug(f"处理物料使用记录: {len(used_materials)}")
processed_materials = []
for material in used_materials:
material_record = {
'material_id': material.materialId,
'location_id': material.locationId,
'type_mode': material.typeMode,
'used_quantity': material.usedQuantity,
'processed_time': time.time()
}
processed_materials.append(material_record)
# 更新库存
self._update_material_inventory(material)
return {
'processed_count': len(processed_materials),
'materials': processed_materials,
'success': True
}
except Exception as e:
logger.error(f"处理物料使用记录失败: {e}")
return {
'processed_count': 0,
'materials': [],
'success': False,
'error': str(e)
}
def _update_material_inventory(self, material):
"""更新物料库存"""
try:
# 这里可以添加库存更新逻辑
# 例如调用库存管理系统API、更新本地缓存等
logger.debug(f"更新物料库存: {material.materialId} - 使用量: {material.usedQuantity}")
except Exception as e:
logger.error(f"更新物料库存失败: {e}")

View File

@@ -0,0 +1,605 @@
"""
工作站HTTP服务模块
Workstation HTTP Service Module
统一的工作站报送接收服务基于LIMS协议规范
1. 步骤完成报送 - POST /report/step_finish
2. 通量完成报送 - POST /report/sample_finish
3. 任务完成报送 - POST /report/order_finish
4. 批量更新报送 - POST /report/batch_update
5. 物料变更报送 - POST /report/material_change
6. 错误处理报送 - POST /report/error_handling
7. 健康检查和状态查询
统一使用LIMS协议字段规范简化接口避免功能重复
"""
import json
import threading
import time
import traceback
from typing import Dict, Any, Optional, List
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urlparse
from dataclasses import dataclass, asdict
from datetime import datetime
from unilabos.utils.log import logger
@dataclass
class WorkstationReportRequest:
"""统一工作站报送请求基于LIMS协议规范"""
token: str # 授权令牌
request_time: str # 请求时间格式2024-12-12 12:12:12.xxx
data: Dict[str, Any] # 报送数据
@dataclass
class MaterialUsage:
"""物料使用记录"""
materialId: str # 物料IdGUID
locationId: str # 库位IdGUID
typeMode: str # 物料类型样品1、试剂2、耗材0
usedQuantity: float # 使用的数量(数字)
@dataclass
class HttpResponse:
"""HTTP响应"""
success: bool
message: str
data: Optional[Dict[str, Any]] = None
acknowledgment_id: Optional[str] = None
class WorkstationHTTPHandler(BaseHTTPRequestHandler):
"""工作站HTTP请求处理器"""
def __init__(self, workstation_instance, *args, **kwargs):
self.workstation = workstation_instance
super().__init__(*args, **kwargs)
def do_POST(self):
"""处理POST请求 - 统一的工作站报送接口"""
try:
# 解析请求路径
parsed_path = urlparse(self.path)
endpoint = parsed_path.path
# 读取请求体
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
post_data = self.rfile.read(content_length)
request_data = json.loads(post_data.decode('utf-8'))
else:
request_data = {}
logger.info(f"收到工作站报送: {endpoint} - {request_data.get('token', 'unknown')}")
# 统一的报送端点路由基于LIMS协议规范
if endpoint == '/report/step_finish':
response = self._handle_step_finish_report(request_data)
elif endpoint == '/report/sample_finish':
response = self._handle_sample_finish_report(request_data)
elif endpoint == '/report/order_finish':
response = self._handle_order_finish_report(request_data)
elif endpoint == '/report/batch_update':
response = self._handle_batch_update_report(request_data)
# 扩展报送端点
elif endpoint == '/report/material_change':
response = self._handle_material_change_report(request_data)
elif endpoint == '/report/error_handling':
response = self._handle_error_handling_report(request_data)
# 保留LIMS协议端点以兼容现有系统
elif endpoint == '/LIMS/step_finish':
response = self._handle_step_finish_report(request_data)
elif endpoint == '/LIMS/preintake_finish':
response = self._handle_sample_finish_report(request_data)
elif endpoint == '/LIMS/order_finish':
response = self._handle_order_finish_report(request_data)
else:
response = HttpResponse(
success=False,
message=f"不支持的报送端点: {endpoint}",
data={"supported_endpoints": [
"/report/step_finish",
"/report/sample_finish",
"/report/order_finish",
"/report/batch_update",
"/report/material_change",
"/report/error_handling"
]}
)
# 发送响应
self._send_response(response)
except Exception as e:
logger.error(f"处理工作站报送失败: {e}\\n{traceback.format_exc()}")
error_response = HttpResponse(
success=False,
message=f"请求处理失败: {str(e)}"
)
self._send_response(error_response)
def do_GET(self):
"""处理GET请求 - 健康检查和状态查询"""
try:
parsed_path = urlparse(self.path)
endpoint = parsed_path.path
if endpoint == '/status':
response = self._handle_status_check()
elif endpoint == '/health':
response = HttpResponse(success=True, message="服务健康")
else:
response = HttpResponse(
success=False,
message=f"不支持的查询端点: {endpoint}",
data={"supported_endpoints": ["/status", "/health"]}
)
self._send_response(response)
except Exception as e:
logger.error(f"GET请求处理失败: {e}")
error_response = HttpResponse(
success=False,
message=f"GET请求处理失败: {str(e)}"
)
self._send_response(error_response)
def _handle_step_finish_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理步骤完成报送统一LIMS协议规范"""
try:
# 验证基本字段
required_fields = ['token', 'request_time', 'data']
if missing_fields := [field for field in required_fields if field not in request_data]:
return HttpResponse(
success=False,
message=f"缺少必要字段: {', '.join(missing_fields)}"
)
# 验证data字段内容
data = request_data['data']
data_required_fields = ['orderCode', 'orderName', 'stepName', 'stepId', 'sampleId', 'startTime', 'endTime']
if data_missing_fields := [field for field in data_required_fields if field not in data]:
return HttpResponse(
success=False,
message=f"data字段缺少必要内容: {', '.join(data_missing_fields)}"
)
# 创建统一请求对象
report_request = WorkstationReportRequest(
token=request_data['token'],
request_time=request_data['request_time'],
data=data
)
# 调用工作站处理方法
result = self.workstation.process_step_finish_report(report_request)
return HttpResponse(
success=True,
message=f"步骤完成报送已处理: {data['stepName']} ({data['orderCode']})",
acknowledgment_id=f"STEP_{int(time.time() * 1000)}_{data['stepId']}",
data=result
)
except Exception as e:
logger.error(f"处理步骤完成报送失败: {e}")
return HttpResponse(
success=False,
message=f"步骤完成报送处理失败: {str(e)}"
)
def _handle_sample_finish_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理通量完成报送统一LIMS协议规范"""
try:
# 验证基本字段
required_fields = ['token', 'request_time', 'data']
if missing_fields := [field for field in required_fields if field not in request_data]:
return HttpResponse(
success=False,
message=f"缺少必要字段: {', '.join(missing_fields)}"
)
# 验证data字段内容
data = request_data['data']
data_required_fields = ['orderCode', 'orderName', 'sampleId', 'startTime', 'endTime', 'Status']
if data_missing_fields := [field for field in data_required_fields if field not in data]:
return HttpResponse(
success=False,
message=f"data字段缺少必要内容: {', '.join(data_missing_fields)}"
)
# 创建统一请求对象
report_request = WorkstationReportRequest(
token=request_data['token'],
request_time=request_data['request_time'],
data=data
)
# 调用工作站处理方法
result = self.workstation.process_sample_finish_report(report_request)
status_names = {
"0": "待生产", "2": "进样", "10": "开始",
"20": "完成", "-2": "异常停止", "-3": "人工停止"
}
status_desc = status_names.get(str(data['Status']), f"状态{data['Status']}")
return HttpResponse(
success=True,
message=f"通量完成报送已处理: {data['sampleId']} ({data['orderCode']}) - {status_desc}",
acknowledgment_id=f"SAMPLE_{int(time.time() * 1000)}_{data['sampleId']}",
data=result
)
except Exception as e:
logger.error(f"处理通量完成报送失败: {e}")
return HttpResponse(
success=False,
message=f"通量完成报送处理失败: {str(e)}"
)
def _handle_order_finish_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理任务完成报送统一LIMS协议规范"""
try:
# 验证基本字段
required_fields = ['token', 'request_time', 'data']
if missing_fields := [field for field in required_fields if field not in request_data]:
return HttpResponse(
success=False,
message=f"缺少必要字段: {', '.join(missing_fields)}"
)
# 验证data字段内容
data = request_data['data']
data_required_fields = ['orderCode', 'orderName', 'startTime', 'endTime', 'status']
if data_missing_fields := [field for field in data_required_fields if field not in data]:
return HttpResponse(
success=False,
message=f"data字段缺少必要内容: {', '.join(data_missing_fields)}"
)
# 处理物料使用记录
used_materials = []
if 'usedMaterials' in data:
for material_data in data['usedMaterials']:
material = MaterialUsage(
materialId=material_data.get('materialId', ''),
locationId=material_data.get('locationId', ''),
typeMode=material_data.get('typeMode', ''),
usedQuantity=material_data.get('usedQuantity', 0.0)
)
used_materials.append(material)
# 创建统一请求对象
report_request = WorkstationReportRequest(
token=request_data['token'],
request_time=request_data['request_time'],
data=data
)
# 调用工作站处理方法
result = self.workstation.process_order_finish_report(report_request, used_materials)
status_names = {"30": "完成", "-11": "异常停止", "-12": "人工停止"}
status_desc = status_names.get(str(data['status']), f"状态{data['status']}")
return HttpResponse(
success=True,
message=f"任务完成报送已处理: {data['orderName']} ({data['orderCode']}) - {status_desc}",
acknowledgment_id=f"ORDER_{int(time.time() * 1000)}_{data['orderCode']}",
data=result
)
except Exception as e:
logger.error(f"处理任务完成报送失败: {e}")
return HttpResponse(
success=False,
message=f"任务完成报送处理失败: {str(e)}"
)
def _handle_batch_update_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理批量报送"""
try:
step_updates = request_data.get('step_updates', [])
sample_updates = request_data.get('sample_updates', [])
order_updates = request_data.get('order_updates', [])
results = {
'step_results': [],
'sample_results': [],
'order_results': [],
'total_processed': 0,
'total_failed': 0
}
# 处理批量步骤更新
for step_data in step_updates:
try:
step_data['token'] = request_data.get('token', step_data.get('token'))
step_data['request_time'] = request_data.get('request_time', step_data.get('request_time'))
result = self._handle_step_finish_report(step_data)
results['step_results'].append(result)
if result.success:
results['total_processed'] += 1
else:
results['total_failed'] += 1
except Exception as e:
results['step_results'].append(HttpResponse(success=False, message=str(e)))
results['total_failed'] += 1
# 处理批量通量更新
for sample_data in sample_updates:
try:
sample_data['token'] = request_data.get('token', sample_data.get('token'))
sample_data['request_time'] = request_data.get('request_time', sample_data.get('request_time'))
result = self._handle_sample_finish_report(sample_data)
results['sample_results'].append(result)
if result.success:
results['total_processed'] += 1
else:
results['total_failed'] += 1
except Exception as e:
results['sample_results'].append(HttpResponse(success=False, message=str(e)))
results['total_failed'] += 1
# 处理批量任务更新
for order_data in order_updates:
try:
order_data['token'] = request_data.get('token', order_data.get('token'))
order_data['request_time'] = request_data.get('request_time', order_data.get('request_time'))
result = self._handle_order_finish_report(order_data)
results['order_results'].append(result)
if result.success:
results['total_processed'] += 1
else:
results['total_failed'] += 1
except Exception as e:
results['order_results'].append(HttpResponse(success=False, message=str(e)))
results['total_failed'] += 1
return HttpResponse(
success=results['total_failed'] == 0,
message=f"批量报送处理完成: {results['total_processed']} 成功, {results['total_failed']} 失败",
acknowledgment_id=f"BATCH_{int(time.time() * 1000)}",
data=results
)
except Exception as e:
logger.error(f"处理批量报送失败: {e}")
return HttpResponse(
success=False,
message=f"批量报送处理失败: {str(e)}"
)
def _handle_material_change_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理物料变更报送"""
try:
# 验证必需字段
required_fields = ['workstation_id', 'timestamp', 'resource_id', 'change_type']
if missing_fields := [field for field in required_fields if field not in request_data]:
return HttpResponse(
success=False,
message=f"缺少必要字段: {', '.join(missing_fields)}"
)
# 调用工作站的处理方法
result = self.workstation.process_material_change_report(request_data)
return HttpResponse(
success=True,
message=f"物料变更报送已处理: {request_data['resource_id']} ({request_data['change_type']})",
acknowledgment_id=f"MATERIAL_{int(time.time() * 1000)}_{request_data['resource_id']}",
data=result
)
except Exception as e:
logger.error(f"处理物料变更报送失败: {e}")
return HttpResponse(
success=False,
message=f"物料变更报送处理失败: {str(e)}"
)
def _handle_error_handling_report(self, request_data: Dict[str, Any]) -> HttpResponse:
"""处理错误处理报送"""
try:
# 验证必需字段
required_fields = ['workstation_id', 'timestamp', 'error_type', 'error_message']
if missing_fields := [field for field in required_fields if field not in request_data]:
return HttpResponse(
success=False,
message=f"缺少必要字段: {', '.join(missing_fields)}"
)
# 调用工作站的处理方法
result = self.workstation.handle_external_error(request_data)
return HttpResponse(
success=True,
message=f"错误处理报送已处理: {request_data['error_type']} - {request_data['error_message']}",
acknowledgment_id=f"ERROR_{int(time.time() * 1000)}_{request_data.get('action_id', 'unknown')}",
data=result
)
except Exception as e:
logger.error(f"处理错误处理报送失败: {e}")
return HttpResponse(
success=False,
message=f"错误处理报送处理失败: {str(e)}"
)
def _handle_status_check(self) -> HttpResponse:
"""处理状态查询"""
try:
return HttpResponse(
success=True,
message="工作站报送服务正常运行",
data={
"workstation_id": self.workstation.device_id,
"service_type": "unified_reporting_service",
"uptime": time.time() - getattr(self.workstation, '_start_time', time.time()),
"reports_received": getattr(self.workstation, '_reports_received_count', 0),
"supported_endpoints": [
"POST /report/step_finish",
"POST /report/sample_finish",
"POST /report/order_finish",
"POST /report/batch_update",
"POST /report/material_change",
"POST /report/error_handling",
"GET /status",
"GET /health"
]
}
)
except Exception as e:
logger.error(f"处理状态查询失败: {e}")
return HttpResponse(
success=False,
message=f"状态查询失败: {str(e)}"
)
def _send_response(self, response: HttpResponse):
"""发送响应"""
try:
# 设置响应状态码
status_code = 200 if response.success else 400
self.send_response(status_code)
# 设置响应头
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
# 发送响应体
response_json = json.dumps(asdict(response), ensure_ascii=False, indent=2)
self.wfile.write(response_json.encode('utf-8'))
except Exception as e:
logger.error(f"发送响应失败: {e}")
def log_message(self, format, *args):
"""重写日志方法"""
logger.debug(f"HTTP请求: {format % args}")
class WorkstationHTTPService:
"""工作站HTTP服务"""
def __init__(self, workstation_instance, host: str = "127.0.0.1", port: int = 8080):
self.workstation = workstation_instance
self.host = host
self.port = port
self.server = None
self.server_thread = None
self.running = False
# 初始化统计信息
self.workstation._start_time = time.time()
self.workstation._reports_received_count = 0
def start(self):
"""启动HTTP服务"""
try:
# 创建处理器工厂函数
def handler_factory(*args, **kwargs):
return WorkstationHTTPHandler(self.workstation, *args, **kwargs)
# 创建HTTP服务器
self.server = HTTPServer((self.host, self.port), handler_factory)
# 在单独线程中运行服务器
self.server_thread = threading.Thread(
target=self._run_server,
daemon=True,
name=f"WorkstationHTTP-{self.workstation.device_id}"
)
self.running = True
self.server_thread.start()
logger.info(f"工作站HTTP报送服务已启动: http://{self.host}:{self.port}")
logger.info("统一的报送端点 (基于LIMS协议规范):")
logger.info(" - POST /report/step_finish # 步骤完成报送")
logger.info(" - POST /report/sample_finish # 通量完成报送")
logger.info(" - POST /report/order_finish # 任务完成报送")
logger.info(" - POST /report/batch_update # 批量更新报送")
logger.info("扩展报送端点:")
logger.info(" - POST /report/material_change # 物料变更报送")
logger.info(" - POST /report/error_handling # 错误处理报送")
logger.info("兼容端点:")
logger.info(" - POST /LIMS/step_finish # 兼容LIMS步骤完成")
logger.info(" - POST /LIMS/preintake_finish # 兼容LIMS通量完成")
logger.info(" - POST /LIMS/order_finish # 兼容LIMS任务完成")
logger.info("服务端点:")
logger.info(" - GET /status # 服务状态查询")
logger.info(" - GET /health # 健康检查")
except Exception as e:
logger.error(f"启动HTTP服务失败: {e}")
raise
def stop(self):
"""停止HTTP服务"""
try:
if self.running and self.server:
self.running = False
self.server.shutdown()
self.server.server_close()
if self.server_thread and self.server_thread.is_alive():
self.server_thread.join(timeout=5.0)
logger.info("工作站HTTP报送服务已停止")
except Exception as e:
logger.error(f"停止HTTP服务失败: {e}")
def _run_server(self):
"""运行HTTP服务器"""
try:
while self.running:
self.server.handle_request()
except Exception as e:
if self.running: # 只在非正常停止时记录错误
logger.error(f"HTTP服务运行错误: {e}")
@property
def is_running(self) -> bool:
"""检查服务是否正在运行"""
return self.running and self.server_thread and self.server_thread.is_alive()
@property
def service_url(self) -> str:
"""获取服务URL"""
return f"http://{self.host}:{self.port}"
# 导出主要类 - 保持向后兼容
@dataclass
class MaterialChangeReport:
"""已废弃物料变更报送请使用统一的WorkstationReportRequest"""
pass
@dataclass
class TaskExecutionReport:
"""已废弃任务执行报送请使用统一的WorkstationReportRequest"""
pass
# 导出列表
__all__ = [
'WorkstationReportRequest',
'MaterialUsage',
'HttpResponse',
'WorkstationHTTPService',
# 向后兼容
'MaterialChangeReport',
'TaskExecutionReport'
]

View File

@@ -43,6 +43,7 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
children: dict, children: dict,
protocol_type: Union[str, list[str]], protocol_type: Union[str, list[str]],
resource_tracker: DeviceNodeResourceTracker, resource_tracker: DeviceNodeResourceTracker,
workstation_config: dict = None, # 新增:工作站配置
*args, *args,
**kwargs, **kwargs,
): ):
@@ -50,6 +51,8 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
# 初始化其它属性 # 初始化其它属性
self.children = children self.children = children
self.workstation_config = workstation_config or {} # 新增:保存工作站配置
self.communication_interfaces = self.workstation_config.get('communication_interfaces', {}) # 从工作站配置获取通信接口
self._busy = False self._busy = False
self.sub_devices = {} self.sub_devices = {}
self._goals = {} self._goals = {}
@@ -69,6 +72,18 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
# 初始化子设备 # 初始化子设备
self.communication_node_id_to_instance = {} self.communication_node_id_to_instance = {}
self._initialize_child_devices()
# 设置硬件接口代理
self._setup_hardware_proxies()
self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}")
def _initialize_child_devices(self):
"""初始化子设备 - 重构为更清晰的方法"""
# 设备分类字典 - 统一管理
self.communication_devices = {}
self.logical_devices = {}
for device_id, device_config in self.children.items(): for device_id, device_config in self.children.items():
if device_config.get("type", "device") != "device": if device_config.get("type", "device") != "device":
@@ -76,50 +91,116 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping."
) )
continue continue
try: try:
d = self.initialize_device(device_id, device_config) d = self.initialize_device(device_id, device_config)
if d is None:
continue
# 统一的设备分类逻辑
device_type = device_config.get("device_type", "logical")
# 兼容旧的ID匹配方式和新的配置方式
if device_type == "communication" or "serial_" in device_id or "io_" in device_id:
self.communication_node_id_to_instance[device_id] = d # 保持向后兼容
self.communication_devices[device_id] = d # 新的统一方式
self.lab_logger().info(f"通信设备 {device_id} 初始化并分类成功")
elif device_type == "logical":
self.logical_devices[device_id] = d
self.lab_logger().info(f"逻辑设备 {device_id} 初始化并分类成功")
else:
# 默认作为逻辑设备处理
self.logical_devices[device_id] = d
self.lab_logger().info(f"设备 {device_id} 作为逻辑设备处理")
except Exception as ex: except Exception as ex:
self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}") self.lab_logger().error(f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}")
d = None
if d is None:
continue
if "serial_" in device_id or "io_" in device_id:
self.communication_node_id_to_instance[device_id] = d
continue
def _setup_hardware_proxies(self):
"""设置硬件接口代理 - 重构为独立方法,支持工作站配置"""
# 1. 传统的协议节点硬件代理设置
for device_id, device_config in self.children.items(): for device_id, device_config in self.children.items():
if device_config.get("type", "device") != "device": if device_config.get("type", "device") != "device":
continue continue
# 设置硬件接口代理 # 设置硬件接口代理
if device_id not in self.sub_devices: if device_id not in self.sub_devices:
self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...") self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...")
continue continue
d = self.sub_devices[device_id] d = self.sub_devices[device_id]
if d: if d:
hardware_interface = d.ros_node_instance._hardware_interface self._setup_device_hardware_proxy(device_id, d)
if (
hasattr(d.driver_instance, hardware_interface["name"])
and hasattr(d.driver_instance, hardware_interface["write"])
and (hardware_interface["read"] is None or hasattr(d.driver_instance, hardware_interface["read"]))
):
name = getattr(d.driver_instance, hardware_interface["name"]) # 2. 工作站配置的通信接口代理设置
read = hardware_interface.get("read", None) if hasattr(self, 'communication_interfaces') and self.communication_interfaces:
write = hardware_interface.get("write", None) self._setup_workstation_communication_interfaces()
# 如果硬件接口是字符串,通过通信设备提供 self.lab_logger().info(f"ROS2ProtocolNode {self.device_id} initialized with protocols: {self.protocol_names}")
if isinstance(name, str) and name in self.sub_devices:
communicate_device = self.sub_devices[name]
communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface
self._setup_hardware_proxy(d, self.sub_devices[name], read, write)
self.lab_logger().info(
f"\n通信代理:为子设备{device_id}\n "
f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n "
f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})"
)
self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") def _setup_workstation_communication_interfaces(self):
"""设置工作站特定的通信接口代理"""
for logical_device_id, logical_device in self.logical_devices.items():
# 检查是否有配置的通信接口
interface_config = getattr(self, 'communication_interfaces', {}).get(logical_device_id)
if not interface_config:
continue
comm_device = self.communication_devices.get(interface_config.device_id)
if not comm_device:
self.lab_logger().error(f"通信设备 {interface_config.device_id} 不存在")
continue
# 设置工作站级别的通信代理
self._setup_workstation_hardware_proxy(
logical_device,
comm_device,
interface_config
)
def _setup_workstation_hardware_proxy(self, logical_device, comm_device, interface_config):
"""为逻辑设备设置工作站级通信代理"""
try:
# 获取通信设备的读写方法
read_func = getattr(comm_device.driver_instance, interface_config.read_method, None)
write_func = getattr(comm_device.driver_instance, interface_config.write_method, None)
if read_func:
setattr(logical_device.driver_instance, 'comm_read', read_func)
if write_func:
setattr(logical_device.driver_instance, 'comm_write', write_func)
# 设置通信配置
setattr(logical_device.driver_instance, 'comm_config', interface_config.config)
setattr(logical_device.driver_instance, 'comm_protocol', interface_config.protocol_type)
self.lab_logger().info(f"为逻辑设备 {logical_device.device_id} 设置工作站通信代理 -> {comm_device.device_id}")
except Exception as e:
self.lab_logger().error(f"设置工作站通信代理失败: {e}")
def _setup_device_hardware_proxy(self, device_id: str, device):
"""为单个设备设置硬件接口代理"""
hardware_interface = device.ros_node_instance._hardware_interface
if (
hasattr(device.driver_instance, hardware_interface["name"])
and hasattr(device.driver_instance, hardware_interface["write"])
and (hardware_interface["read"] is None or hasattr(device.driver_instance, hardware_interface["read"]))
):
name = getattr(device.driver_instance, hardware_interface["name"])
read = hardware_interface.get("read", None)
write = hardware_interface.get("write", None)
# 如果硬件接口是字符串,通过通信设备提供
if isinstance(name, str) and name in self.sub_devices:
communicate_device = self.sub_devices[name]
communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface
self._setup_hardware_proxy(device, self.sub_devices[name], read, write)
self.lab_logger().info(
f"\n通信代理:为子设备{device_id}\n "
f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n "
f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})"
)
def _setup_protocol_names(self, protocol_type): def _setup_protocol_names(self, protocol_type):
# 处理协议类型 # 处理协议类型