feat: 多ProtocolNode 允许子设备ID相同

feat: 上报发现的ActionClient
feat: Host重启动,通过discover机制要求slaveNode重新注册,实现信息及时上报
This commit is contained in:
wznln
2025-05-01 14:36:15 +08:00
parent 7a51b2adc1
commit 9d2bfec1dd
7 changed files with 119 additions and 54 deletions

View File

@@ -96,6 +96,9 @@ def main():
# 设置BasicConfig参数 # 设置BasicConfig参数
BasicConfig.is_host_mode = not args_dict.get("without_host", False) BasicConfig.is_host_mode = not args_dict.get("without_host", False)
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
machine_name = os.popen("hostname").read().strip()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
BasicConfig.machine_name = machine_name
from unilabos.resources.graphio import ( from unilabos.resources.graphio import (
read_node_link_json, read_node_link_json,

View File

@@ -168,11 +168,8 @@ class MQTTClient:
if self.mqtt_disable: if self.mqtt_disable:
return return
address = f"labs/{MQConfig.lab_id}/actions/" address = f"labs/{MQConfig.lab_id}/actions/"
action_type_name = action_info["title"] self.client.publish(address, json.dumps(action_info), qos=2)
action_info["title"] = action_id logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}")
action_data = json.dumps({action_type_name: action_info}, ensure_ascii=False)
self.client.publish(address, action_data, qos=2)
logger.debug(f"Action data published: address: {address}, {action_data}")
mqtt_client = MQTTClient() mqtt_client = MQTTClient()

View File

@@ -30,13 +30,13 @@ def get_host_node_info() -> Dict[str, Any]:
return host_info return host_info
host_info["available"] = True host_info["available"] = True
host_info["devices"] = { host_info["devices"] = {
device_id: { edge_device_id: {
"namespace": namespace, "namespace": namespace,
"is_online": f"{namespace}/{device_id}" in host_node._online_devices, "is_online": f"{namespace}/{edge_device_id}" in host_node._online_devices,
"key": f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}", "key": f"{namespace}/{edge_device_id}" if namespace.startswith("/") else f"/{namespace}/{edge_device_id}",
"machine_name": host_node.device_machine_names.get(device_id, "未知"), "machine_name": host_node.device_machine_names.get(edge_device_id, "未知"),
} }
for device_id, namespace in host_node.devices_names.items() for edge_device_id, namespace in host_node.devices_names.items()
} }
# 获取已订阅的主题 # 获取已订阅的主题
host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics)) host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics))

View File

@@ -12,6 +12,7 @@ class BasicConfig:
config_path = "" config_path = ""
is_host_mode = True # 从registry.py移动过来 is_host_mode = True # 从registry.py移动过来
slave_no_host = False # 是否跳过rclient.wait_for_service() slave_no_host = False # 是否跳过rclient.wait_for_service()
machine_name = "undefined"
# MQTT配置 # MQTT配置

View File

@@ -91,9 +91,7 @@ def slave(
# else: # else:
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node") # print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
machine_name = os.popen("hostname").read().strip() n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
n = Node(f"slaveMachine_{machine_name}", parameter_overrides=[])
executor.add_node(n) executor.add_node(n)
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread") thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
@@ -105,7 +103,7 @@ def slave(
request = SerialCommand.Request() request = SerialCommand.Request()
request.command = json.dumps({ request.command = json.dumps({
"machine_name": machine_name, "machine_name": BasicConfig.machine_name,
"type": "slave", "type": "slave",
"devices_config": devices_config_copy, "devices_config": devices_config_copy,
"registry_config": lab_registry.obtain_registry_device_info() "registry_config": lab_registry.obtain_registry_device_info()

View File

@@ -1,3 +1,4 @@
import json
import threading import threading
import time import time
import traceback import traceback
@@ -13,15 +14,17 @@ from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle from rclpy.action.server import ServerGoalHandle
from rclpy.client import Client from rclpy.client import Client
from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.service import Service
from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type
from unilabos.ros.msgs.message_converter import ( from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg, convert_to_ros_msg,
convert_from_ros_msg, convert_from_ros_msg,
convert_from_ros_msg_with_mapping, convert_from_ros_msg_with_mapping,
convert_to_ros_msg_with_mapping, convert_to_ros_msg_with_mapping, ros_action_to_json_schema,
) )
from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \
SerialCommand # type: ignore
from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
@@ -29,7 +32,7 @@ from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func from unilabos.utils.async_util import run_async_func
from unilabos.utils.log import info, debug, warning, error, critical, logger from unilabos.utils.log import info, debug, warning, error, critical, logger
from unilabos.utils.type_check import get_type_class from unilabos.utils.type_check import get_type_class, TypeEncoder
T = TypeVar("T") T = TypeVar("T")
@@ -44,19 +47,17 @@ class ROSLoggerAdapter:
@property @property
def identifier(self): def identifier(self):
return f"{self.namespace}/{self.node_name}" return f"{self.namespace}"
def __init__(self, ros_logger, node_name, namespace): def __init__(self, ros_logger, namespace):
""" """
初始化日志适配器 初始化日志适配器
Args: Args:
ros_logger: ROS2日志记录器 ros_logger: ROS2日志记录器
node_name: 节点名称
namespace: 命名空间 namespace: 命名空间
""" """
self.ros_logger = ros_logger self.ros_logger = ros_logger
self.node_name = node_name
self.namespace = namespace self.namespace = namespace
self.level_2_logger_func = { self.level_2_logger_func = {
"info": info, "info": info,
@@ -258,9 +259,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.lab_logger().critical("资源跟踪器未初始化,请检查") self.lab_logger().critical("资源跟踪器未初始化,请检查")
# 创建自定义日志记录器 # 创建自定义日志记录器
self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.node_name, self.namespace) self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.namespace)
self._action_servers = {} self._action_servers: Dict[str, ActionServer] = {}
self._property_publishers = {} self._property_publishers = {}
self._status_types = status_types self._status_types = status_types
self._action_value_mappings = action_value_mappings self._action_value_mappings = action_value_mappings
@@ -284,7 +285,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.create_ros_action_server(action_name, action_value_mapping) self.create_ros_action_server(action_name, action_value_mapping)
# 创建线程池执行器 # 创建线程池执行器
self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1)) self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}")
# 创建资源管理客户端 # 创建资源管理客户端
self._resource_clients: Dict[str, Client] = { self._resource_clients: Dict[str, Client] = {
@@ -295,6 +296,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"resource_list": self.create_client(ResourceList, "/resources/list"), "resource_list": self.create_client(ResourceList, "/resources/list"),
} }
def query_host_name_cb(req, res):
self.register_device()
self.lab_logger().info("Host要求重新注册当前节点")
res.response = ""
return res
self._service_server: Dict[str, Service] = {
"query_host_name": self.create_service(
SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group
),
}
# 向全局在线设备注册表添加设备信息 # 向全局在线设备注册表添加设备信息
self.register_device() self.register_device()
rclpy.get_global_executor().add_node(self) rclpy.get_global_executor().add_node(self)
@@ -318,6 +331,31 @@ class BaseROS2DeviceNode(Node, Generic[T]):
) )
# 加入全局注册表 # 加入全局注册表
registered_devices[self.device_id] = device_info registered_devices[self.device_id] = device_info
from unilabos.config.config import BasicConfig
if not BasicConfig.is_host_mode:
sclient = self.create_client(SerialCommand, "/node_info_update")
# 启动线程执行发送任务
threading.Thread(
target=self.send_slave_node_info,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_send_slave_node_info"
).start()
def send_slave_node_info(self, sclient):
sclient.wait_for_service()
request = SerialCommand.Request()
from unilabos.config.config import BasicConfig
request.command = json.dumps({
"SYNC_SLAVE_NODE_INFO": {
"machine_name": BasicConfig.machine_name,
"type": "slave",
"edge_device_id": self.device_id
}}, ensure_ascii=False, cls=TypeEncoder)
# 发送异步请求并等待结果
future = sclient.call_async(request)
response = future.result()
def lab_logger(self): def lab_logger(self):
""" """

View File

@@ -27,6 +27,7 @@ from unilabos.ros.msgs.message_converter import (
) )
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.utils.type_check import TypeEncoder
class HostNode(BaseROS2DeviceNode): class HostNode(BaseROS2DeviceNode):
@@ -98,7 +99,7 @@ class HostNode(BaseROS2DeviceNode):
# 创建设备、动作客户端和目标存储 # 创建设备、动作客户端和目标存储
self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射 self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
self.device_machine_names: Dict[str, str] = {} # 存储设备ID到机器名称的映射 self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例 self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = ( self._action_value_mappings: Dict[str, Dict] = (
{} {}
@@ -160,6 +161,13 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info("[Host Node] Host node initialized.") self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set() HostNode._ready_event.set()
def _send_re_register(self, sclient):
sclient.wait_for_service()
request = SerialCommand.Request()
request.command = ""
future = sclient.call_async(request)
response = future.result()
def _discover_devices(self) -> None: def _discover_devices(self) -> None:
""" """
发现网络中的设备 发现网络中的设备
@@ -176,23 +184,37 @@ class HostNode(BaseROS2DeviceNode):
current_devices = set() current_devices = set()
for device_id, namespace in nodes_and_names: for device_id, namespace in nodes_and_names:
if not namespace.startswith("/devices"): if not namespace.startswith("/devices/"):
continue continue
edge_device_id = namespace[9:]
# 将设备添加到当前设备集合 # 将设备添加到当前设备集合
device_key = f"{namespace}/{device_id}" device_key = f"{namespace}/{edge_device_id}" # namespace已经包含device_id了这里复写一遍
current_devices.add(device_key) current_devices.add(device_key)
# 如果是新设备记录并创建ActionClient # 如果是新设备记录并创建ActionClient
if device_id not in self.devices_names: if edge_device_id not in self.devices_names:
self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}") self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}")
self.devices_names[device_id] = namespace self.devices_names[edge_device_id] = namespace
self._create_action_clients_for_device(device_id, namespace) self._create_action_clients_for_device(device_id, namespace)
self._online_devices.add(device_key) self._online_devices.add(device_key)
sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name")
threading.Thread(
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
).start()
elif device_key not in self._online_devices: elif device_key not in self._online_devices:
# 设备重新上线 # 设备重新上线
self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}") self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}")
self._online_devices.add(device_key) self._online_devices.add(device_key)
sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name")
threading.Thread(
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
).start()
# 检测离线设备 # 检测离线设备
offline_devices = self._online_devices - current_devices offline_devices = self._online_devices - current_devices
@@ -234,17 +256,22 @@ class HostNode(BaseROS2DeviceNode):
self._action_clients[action_id] = ActionClient( self._action_clients[action_id] = ActionClient(
self, action_type, action_id, callback_group=self.callback_group self, action_type, action_id, callback_group=self.callback_group
) )
self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
action_name = action_id[len(namespace) + 1:]
edge_device_id = namespace[9:]
from unilabos.app.mq import mqtt_client from unilabos.app.mq import mqtt_client
info_with_schema = ros_action_to_json_schema(action_type) info_with_schema = ros_action_to_json_schema(action_type)
mqtt_client.publish_actions(action_id, info_with_schema) mqtt_client.publish_actions(action_name, {
"device_id": edge_device_id,
"action_name": action_name,
"schema": info_with_schema,
})
except Exception as e: except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}") self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
""" """
根据配置初始化设备 根据配置初始化设备
此函数根据提供的设备配置动态导入适当的设备类并创建其实例。 此函数根据提供的设备配置动态导入适当的设备类并创建其实例。
同时为设备的动作值映射设置动作客户端。 同时为设备的动作值映射设置动作客户端。
@@ -260,7 +287,7 @@ class HostNode(BaseROS2DeviceNode):
if d is None: if d is None:
return return
# noinspection PyProtectedMember # noinspection PyProtectedMember
self.devices_names[device_id] = d._ros_node.namespace self.devices_names[device_id] = d._ros_node.namespace # 这里不涉及二级device_id
self.device_machine_names[device_id] = "本地" self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d self.devices_instances[device_id] = d
# noinspection PyProtectedMember # noinspection PyProtectedMember
@@ -269,14 +296,17 @@ class HostNode(BaseROS2DeviceNode):
if action_id not in self._action_clients: if action_id not in self._action_clients:
action_type = action_value_mapping["type"] action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id) self._action_clients[action_id] = ActionClient(self, action_type, action_id)
self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
from unilabos.app.mq import mqtt_client from unilabos.app.mq import mqtt_client
info_with_schema = ros_action_to_json_schema(action_type) info_with_schema = ros_action_to_json_schema(action_type)
mqtt_client.publish_actions(action_id, info_with_schema) mqtt_client.publish_actions(action_name, {
"device_id": device_id,
"action_name": action_name,
"schema": info_with_schema,
})
else: else:
self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.") self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.")
device_key = f"{self.devices_names[device_id]}/{device_id}" device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id
# 添加到在线设备列表 # 添加到在线设备列表
self._online_devices.add(device_key) self._online_devices.add(device_key)
@@ -298,8 +328,8 @@ class HostNode(BaseROS2DeviceNode):
# 解析设备名和属性名 # 解析设备名和属性名
parts = topic.split("/") parts = topic.split("/")
if len(parts) >= 4: if len(parts) >= 4: # 可能有ProtocolNode创建更长的设备
device_id = parts[-2] device_id = "/".join(parts[2:-1])
property_name = parts[-1] property_name = parts[-1]
# 初始化设备状态字典 # 初始化设备状态字典
@@ -526,21 +556,19 @@ class HostNode(BaseROS2DeviceNode):
from unilabos.app.mq import mqtt_client from unilabos.app.mq import mqtt_client
info = json.loads(request.command) info = json.loads(request.command)
if "SYNC_SLAVE_NODE_INFO" in info:
info = info["SYNC_SLAVE_NODE_INFO"]
machine_name = info["machine_name"] machine_name = info["machine_name"]
devices_config = info["devices_config"] edge_device_id = info["edge_device_id"]
self.device_machine_names[edge_device_id] = machine_name
else:
registry_config = info["registry_config"] registry_config = info["registry_config"]
# 更新设备机器名称映射
for device_id in devices_config.keys():
self.device_machine_names[device_id] = machine_name
self.lab_logger().debug(f"[Host Node] Updated machine name for device {device_id}: {machine_name}")
for device_config in registry_config: for device_config in registry_config:
mqtt_client.publish_registry(device_config["id"], device_config) mqtt_client.publish_registry(device_config["id"], device_config)
self.lab_logger().info(f"[Host Node] Node info update: {info}") self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK" response.response = "OK"
except Exception as e: except Exception as e:
self.lab_logger().error(f"[Host Node] Error updating node info: {str(e)}") self.lab_logger().error(f"[Host Node] Error updating node info: {e.args}")
response.response = "ERROR" response.response = "ERROR"
return response return response