mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-15 04:05:10 +00:00
Compare commits
3 Commits
d297abfd19
...
34151f5cb2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34151f5cb2 | ||
|
|
369a21b904 | ||
|
|
90169981c1 |
@@ -5,6 +5,6 @@ io_snrd:
|
|||||||
type: python
|
type: python
|
||||||
hardware_interface:
|
hardware_interface:
|
||||||
name: modbus_client
|
name: modbus_client
|
||||||
extra_info: address
|
extra_info: []
|
||||||
read: read_io_coil
|
read: read_io_coil
|
||||||
write: write_io_coil
|
write: write_io_coil
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
import time
|
import time
|
||||||
import asyncio
|
import asyncio
|
||||||
import traceback
|
import traceback
|
||||||
|
from types import MethodType
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
|
||||||
import rclpy
|
import rclpy
|
||||||
@@ -87,7 +88,10 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
|
|||||||
|
|
||||||
# 如果硬件接口是字符串,通过通信设备提供
|
# 如果硬件接口是字符串,通过通信设备提供
|
||||||
if isinstance(name, str) and name in self.sub_devices:
|
if isinstance(name, str) and name in self.sub_devices:
|
||||||
|
communicate_device = self.sub_devices[name]
|
||||||
|
communicate_hardware_info = communicate_device.ros_node_instance._hardware_interface
|
||||||
self._setup_hardware_proxy(d, self.sub_devices[name], read, write)
|
self._setup_hardware_proxy(d, self.sub_devices[name], read, write)
|
||||||
|
self.lab_logger().info(f"\n通信代理:为子设备{device_id}\n 添加了{read}方法(来源:{name} {communicate_hardware_info['write']}) \n 添加了{write}方法(来源:{name} {communicate_hardware_info['read']})")
|
||||||
|
|
||||||
def _setup_protocol_names(self, protocol_type):
|
def _setup_protocol_names(self, protocol_type):
|
||||||
# 处理协议类型
|
# 处理协议类型
|
||||||
@@ -241,20 +245,23 @@ class ROS2ProtocolNode(BaseROS2DeviceNode):
|
|||||||
|
|
||||||
def _setup_hardware_proxy(self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method):
|
def _setup_hardware_proxy(self, device: ROS2DeviceNode, communication_device: ROS2DeviceNode, read_method, write_method):
|
||||||
"""为设备设置硬件接口代理"""
|
"""为设备设置硬件接口代理"""
|
||||||
extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])]
|
# extra_info = [getattr(device.driver_instance, info) for info in communication_device.ros_node_instance._hardware_interface.get("extra_info", [])]
|
||||||
write_func = getattr(communication_device.ros_node_instance, communication_device.ros_node_instance._hardware_interface["write"])
|
write_func = getattr(communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["write"])
|
||||||
read_func = getattr(communication_device.ros_node_instance, communication_device.ros_node_instance._hardware_interface["read"])
|
read_func = getattr(communication_device.driver_instance, communication_device.ros_node_instance._hardware_interface["read"])
|
||||||
|
|
||||||
def _read():
|
def _read(*args, **kwargs):
|
||||||
return read_func(*extra_info)
|
return read_func(*args, **kwargs)
|
||||||
|
|
||||||
def _write(command):
|
def _write(*args, **kwargs):
|
||||||
return write_func(*extra_info, command)
|
return write_func(*args, **kwargs)
|
||||||
|
|
||||||
if read_method:
|
if read_method:
|
||||||
setattr(device.driver_instance, read_method, _read)
|
bound_read = MethodType(_read, device.driver_instance)
|
||||||
|
setattr(device.driver_instance, read_method, bound_read)
|
||||||
|
|
||||||
if write_method:
|
if write_method:
|
||||||
setattr(device.driver_instance, write_method, _write)
|
bound_write = MethodType(_write, device.driver_instance)
|
||||||
|
setattr(device.driver_instance, write_method, bound_write)
|
||||||
|
|
||||||
|
|
||||||
async def _update_resources(self, goal, protocol_kwargs):
|
async def _update_resources(self, goal, protocol_kwargs):
|
||||||
|
|||||||
Reference in New Issue
Block a user