diff --git a/unilabos/ros/nodes/presets/workstation.py b/unilabos/ros/nodes/presets/workstation.py index de4e94cf..4e2dbef8 100644 --- a/unilabos/ros/nodes/presets/workstation.py +++ b/unilabos/ros/nodes/presets/workstation.py @@ -70,50 +70,12 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): resource_tracker=resource_tracker, ) - self.workstation_config = children - self.communication_interfaces = self.workstation_config.get( - "communication_interfaces", {} - ) # 从工作站配置获取通信接口 - - # 新增:获取工作站实例(如果存在) - self.workstation_instance = driver_instance - self._busy = False self.sub_devices = {} - self.communication_devices = {} - self.logical_devices = {} - self._goals = {} - self._protocol_servers = {} self._action_clients = {} + # 初始化子设备 - self._initialize_child_devices() - - if isinstance(getattr(driver_instance, "hardware_interface", None), str): - self.logical_devices[device_id] = driver_instance - else: - self.communication_devices[device_id] = driver_instance - - # 设置硬件接口代理 - for device_id, device_node in self.logical_devices.items(): - if device_node and hasattr(device_node, "ros_node_instance"): - self._setup_device_hardware_proxy(device_id, device_node) - - # 新增:如果有工作站实例,建立双向引用和硬件接口设置 - if self.workstation_instance: - self._setup_workstation_integration() - - def _setup_workstation_integration(self): - """设置工作站集成 - 统一设备处理模式""" - # 1. 建立协议节点引用 - self.workstation_instance.set_workstation_node(self) - - self.lab_logger().info( - f"ROS2WorkstationNode {self.device_id} 与工作站实例 {type(self.workstation_instance).__name__} 集成完成" - ) - - def _initialize_child_devices(self): - """初始化子设备 - 重构为更清晰的方法""" - # 设备分类字典 - 统一管理 + self.communication_node_id_to_instance = {} for device_id, device_config in self.children.items(): if device_config.get("type", "device") != "device": @@ -121,128 +83,52 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): f"[Protocol Node] Skipping type {device_config['type']} {device_id} already existed, skipping." ) continue - try: d = self.initialize_device(device_id, device_config) - if d is None: - continue - - # 统一的设备分类逻辑 - device_type = device_config.get("device_type", "logical") - - # 兼容旧的ID匹配方式和新的配置方式 - if device_type == "communication" or "serial_" in device_id or "io_" in device_id: - self.communication_devices[device_id] = d # 新的统一方式 - self.lab_logger().info(f"通信设备 {device_id} 初始化并分类成功") - elif device_type == "logical": - self.logical_devices[device_id] = d - self.lab_logger().info(f"逻辑设备 {device_id} 初始化并分类成功") - else: - # 默认作为逻辑设备处理 - self.logical_devices[device_id] = d - self.lab_logger().info(f"设备 {device_id} 作为逻辑设备处理") - except Exception as ex: self.lab_logger().error( - f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}" - ) + f"[Protocol Node] Failed to initialize device {device_id}: {ex}\n{traceback.format_exc()}") + d = None + if d is None: + continue - def _setup_device_hardware_proxy(self, device_id: str, device: ROS2DeviceNode): - """统一的设备硬件接口代理设置方法 + if "serial_" in device_id or "io_" in device_id: + self.communication_node_id_to_instance[device_id] = d + continue - Args: - device_id: 设备ID - device: 设备实例 - """ - hardware_interface = device.ros_node_instance._hardware_interface - if not self._validate_hardware_interface(device, hardware_interface): - return + for device_id, device_config in self.children.items(): + if device_config.get("type", "device") != "device": + continue + # 设置硬件接口代理 + if device_id not in self.sub_devices: + self.lab_logger().error(f"[Protocol Node] {device_id} 还没有正确初始化,跳过...") + continue + d = self.sub_devices[device_id] + if d: + hardware_interface = d.ros_node_instance._hardware_interface + if ( + hasattr(d.driver_instance, hardware_interface["name"]) + and hasattr(d.driver_instance, hardware_interface["write"]) + and ( + hardware_interface["read"] is None or hasattr(d.driver_instance, hardware_interface["read"])) + ): - # 获取硬件接口名称 - interface_name = getattr(device.driver_instance, hardware_interface["name"]) + name = getattr(d.driver_instance, hardware_interface["name"]) + read = hardware_interface.get("read", None) + write = hardware_interface.get("write", None) - # 情况1: 如果interface_name是字符串,说明需要转发到其他设备 - if isinstance(interface_name, str): - # 查找目标设备 - communication_device = self.communication_devices.get(device_id, None) - if not communication_device: - self.lab_logger().error(f"转发目标设备 {device_id} 不存在") - return + # 如果硬件接口是字符串,通过通信设备提供 + if isinstance(name, str) and name in self.sub_devices: + communicate_device = self.sub_devices[name] + communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface + self._setup_hardware_proxy(d, self.sub_devices[name], read, write) + self.lab_logger().info( + f"\n通信代理:为子设备{device_id}\n " + f"添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n " + f"添加了{write}方法(来源:{name} {communicate_hardware_info['read']})" + ) - read_method = hardware_interface.get("read", None) - write_method = hardware_interface.get("write", None) - - # 设置传统硬件代理 - communicate_hardware_info = communication_device.ros_node_instance._hardware_interface - self._setup_hardware_proxy(device, communication_device, read_method, write_method) - self.lab_logger().info( - f"传统通信代理:为子设备{device.device_id} " - f"添加了{read_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['read']}) " - f"添加了{write_method}方法(来源:{communication_device.device_id} {communicate_hardware_info['write']})" - ) - self.lab_logger().info(f"字符串转发代理:设备 {device.device_id} -> {device_id}") - - # 情况2: 如果设备有communication_interface配置,设置协议代理 - elif hasattr(self, "communication_interfaces") and device_id in self.communication_interfaces: - interface_config = self._get_communication_interface_config(device_id) - protocol_type = interface_config.get("protocol_type", "modbus") - self._setup_communication_proxy(device, interface_config, protocol_type) - - # 情况3: 其他情况,使用默认处理 - else: - self.lab_logger().debug(f"设备 {device_id} 使用默认硬件接口处理") - - def _get_communication_interface_config(self, device_id: str) -> dict: - """获取设备的通信接口配置""" - # 优先从工作站配置获取 - if hasattr(self, "communication_interfaces") and device_id in self.communication_interfaces: - return self.communication_interfaces[device_id] - - # 从设备自身配置获取 - device_node = self.logical_devices[device_id] - if device_node and hasattr(device_node.driver_instance, "communication_interface"): - return getattr(device_node.driver_instance, "communication_interface") - - return {} - - def _validate_hardware_interface(self, device: ROS2DeviceNode, hardware_interface: dict) -> bool: - """验证硬件接口配置""" - return ( - hasattr(device.driver_instance, hardware_interface["name"]) - and hasattr(device.driver_instance, hardware_interface["write"]) - and (hardware_interface["read"] is None or hasattr(device.driver_instance, hardware_interface["read"])) - ) - - def _setup_communication_proxy(self, logical_device: ROS2DeviceNode, interface_config, protocol_type): - """为逻辑设备设置通信代理 - 统一方法""" - try: - # 获取通信设备 - comm_device_id = interface_config.get("device_id") - comm_device = self.communication_devices.get(comm_device_id) - - if not comm_device: - self.lab_logger().error(f"通信设备 {comm_device_id} 不存在") - return - # FIXME http、modbus(tcpip)都是支持多客户端的 - # 根据协议类型设置不同的代理方法 - if protocol_type == "modbus": - self._setup_modbus_proxy(logical_device, comm_device, interface_config) - elif protocol_type == "opcua": - self._setup_opcua_proxy(logical_device, comm_device, interface_config) - elif protocol_type == "http": - self._setup_http_proxy(logical_device, comm_device, interface_config) - elif protocol_type == "serial": - self._setup_serial_proxy(logical_device, comm_device, interface_config) - else: - self.lab_logger().warning(f"不支持的协议类型: {protocol_type}") - return - - self.lab_logger().info( - f"通信代理:为逻辑设备 {logical_device.device_id} 设置{protocol_type}通信代理 -> {comm_device_id}" - ) - - except Exception as e: - self.lab_logger().error(f"设置通信代理失败: {e}") + self.lab_logger().info(f"ROS2ProtocolNode {device_id} initialized with protocols: {self.protocol_names}") def _setup_protocol_names(self, protocol_type): # 处理协议类型 @@ -272,8 +158,7 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): node.resource_tracker = self.resource_tracker # 站内应当共享资源跟踪器 for action_name, action_mapping in node._action_value_mappings.items(): if action_name.startswith("auto-") or str(action_mapping.get("type", "")).startswith( - "UniLabJsonCommand" - ): + "UniLabJsonCommand"): continue action_id = f"/devices/{device_id_abs}/{action_name}" if action_id not in self._action_clients: @@ -358,10 +243,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): logs.append(step) elif isinstance(step, list): logs.append(step) - self.lab_logger().info( - f"Goal received: {protocol_kwargs}, running steps: " - f"{json.dumps(logs, indent=4, ensure_ascii=False)}" - ) + self.lab_logger().info(f"Goal received: {protocol_kwargs}, running steps: " + f"{json.dumps(logs, indent=4, ensure_ascii=False)}") time_start = time.time() time_overall = 100 @@ -419,12 +302,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): except Exception as e: # 捕获并记录错误信息 str_step_results = [ - { - k: dict(message_to_ordereddict(v)) if k == "result" and hasattr(v, "SLOT_TYPES") else v - for k, v in i.items() - } - for i in step_results - ] + {k: dict(message_to_ordereddict(v)) if k == "result" and hasattr(v, "SLOT_TYPES") else v for k, v in + i.items()} for i in step_results] execution_error = f"{traceback.format_exc()}\n\nStep Result: {pformat(str_step_results)}" execution_success = False self.lab_logger().error(f"协议 {protocol_name} 执行出错: {str(e)} \n{traceback.format_exc()}") @@ -493,95 +372,8 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): """还没有改过的部分""" - def _setup_modbus_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置Modbus通信代理""" - config = interface_config.get("config", {}) - - # 设置Modbus读写方法 - def modbus_read(address, count=1, function_code=3): - """Modbus读取方法""" - return comm_device.driver_instance.read_holding_registers( - address=address, count=count, slave_id=config.get("slave_id", 1) - ) - - def modbus_write(address, value, function_code=6): - """Modbus写入方法""" - if isinstance(value, (list, tuple)): - return comm_device.driver_instance.write_multiple_registers( - address=address, values=value, slave_id=config.get("slave_id", 1) - ) - else: - return comm_device.driver_instance.write_single_register( - address=address, value=value, slave_id=config.get("slave_id", 1) - ) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, "comm_read", modbus_read) - setattr(logical_device.driver_instance, "comm_write", modbus_write) - setattr(logical_device.driver_instance, "comm_config", config) - setattr(logical_device.driver_instance, "comm_protocol", "modbus") - - def _setup_opcua_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置OPC UA通信代理""" - config = interface_config.get("config", {}) - - def opcua_read(node_id): - """OPC UA读取方法""" - return comm_device.driver_instance.read_node_value(node_id) - - def opcua_write(node_id, value): - """OPC UA写入方法""" - return comm_device.driver_instance.write_node_value(node_id, value) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, "comm_read", opcua_read) - setattr(logical_device.driver_instance, "comm_write", opcua_write) - setattr(logical_device.driver_instance, "comm_config", config) - setattr(logical_device.driver_instance, "comm_protocol", "opcua") - - def _setup_http_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置HTTP/RPC通信代理""" - config = interface_config.get("config", {}) - base_url = config.get("base_url", "http://localhost:8080") - - def http_read(endpoint, params=None): - """HTTP GET请求""" - url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" - return comm_device.driver_instance.get_request(url, params=params) - - def http_write(endpoint, data): - """HTTP POST请求""" - url = f"{base_url.rstrip('/')}/{endpoint.lstrip('/')}" - return comm_device.driver_instance.post_request(url, data=data) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, "comm_read", http_read) - setattr(logical_device.driver_instance, "comm_write", http_write) - setattr(logical_device.driver_instance, "comm_config", config) - setattr(logical_device.driver_instance, "comm_protocol", "http") - - def _setup_serial_proxy(self, logical_device: ROS2DeviceNode, comm_device: ROS2DeviceNode, interface_config): - """设置串口通信代理""" - config = interface_config.get("config", {}) - - def serial_read(timeout=1.0): - """串口读取方法""" - return comm_device.driver_instance.read_data(timeout=timeout) - - def serial_write(data): - """串口写入方法""" - if isinstance(data, str): - data = data.encode("utf-8") - return comm_device.driver_instance.write_data(data) - - # 绑定方法到逻辑设备 - setattr(logical_device.driver_instance, "comm_read", serial_read) - setattr(logical_device.driver_instance, "comm_write", serial_write) - setattr(logical_device.driver_instance, "comm_config", config) - setattr(logical_device.driver_instance, "comm_protocol", "serial") - def _setup_hardware_proxy( - self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method + self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method ): """为设备设置硬件接口代理""" # extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])] @@ -618,4 +410,4 @@ class ROS2WorkstationNode(BaseROS2DeviceNode): ] await self._resource_clients["resource_update"].call_async(r) except Exception as e: - self.lab_logger().error(f"更新资源失败: {e}") + self.lab_logger().error(f"更新资源失败: {e}") \ No newline at end of file