diff --git a/unilabos/app/main.py b/unilabos/app/main.py index 8b87736f..f75a3295 100644 --- a/unilabos/app/main.py +++ b/unilabos/app/main.py @@ -96,6 +96,9 @@ def main(): # 设置BasicConfig参数 BasicConfig.is_host_mode = not args_dict.get("without_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) + machine_name = os.popen("hostname").read().strip() + machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) + BasicConfig.machine_name = machine_name from unilabos.resources.graphio import ( read_node_link_json, diff --git a/unilabos/app/mq.py b/unilabos/app/mq.py index f604c72d..a6123fb2 100644 --- a/unilabos/app/mq.py +++ b/unilabos/app/mq.py @@ -168,11 +168,8 @@ class MQTTClient: if self.mqtt_disable: return address = f"labs/{MQConfig.lab_id}/actions/" - action_type_name = action_info["title"] - action_info["title"] = action_id - action_data = json.dumps({action_type_name: action_info}, ensure_ascii=False) - self.client.publish(address, action_data, qos=2) - logger.debug(f"Action data published: address: {address}, {action_data}") + self.client.publish(address, json.dumps(action_info), qos=2) + logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}") mqtt_client = MQTTClient() diff --git a/unilabos/app/web/utils/host_utils.py b/unilabos/app/web/utils/host_utils.py index 6ceb068a..a9070486 100644 --- a/unilabos/app/web/utils/host_utils.py +++ b/unilabos/app/web/utils/host_utils.py @@ -30,13 +30,13 @@ def get_host_node_info() -> Dict[str, Any]: return host_info host_info["available"] = True host_info["devices"] = { - device_id: { + edge_device_id: { "namespace": namespace, - "is_online": f"{namespace}/{device_id}" in host_node._online_devices, - "key": f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}", - "machine_name": host_node.device_machine_names.get(device_id, "未知"), + "is_online": f"{namespace}/{edge_device_id}" in host_node._online_devices, + "key": f"{namespace}/{edge_device_id}" if namespace.startswith("/") else f"/{namespace}/{edge_device_id}", + "machine_name": host_node.device_machine_names.get(edge_device_id, "未知"), } - for device_id, namespace in host_node.devices_names.items() + for edge_device_id, namespace in host_node.devices_names.items() } # 获取已订阅的主题 host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics)) diff --git a/unilabos/config/config.py b/unilabos/config/config.py index 5c1f7b9c..3f3d8dd7 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -12,6 +12,7 @@ class BasicConfig: config_path = "" is_host_mode = True # 从registry.py移动过来 slave_no_host = False # 是否跳过rclient.wait_for_service() + machine_name = "undefined" # MQTT配置 diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index 15265c4d..9ac96748 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -91,9 +91,7 @@ def slave( # else: # print(f"Warning: Device {device_id} could not be initialized or is not a valid Node") - machine_name = os.popen("hostname").read().strip() - machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name]) - n = Node(f"slaveMachine_{machine_name}", parameter_overrides=[]) + n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[]) executor.add_node(n) thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread") @@ -105,7 +103,7 @@ def slave( request = SerialCommand.Request() request.command = json.dumps({ - "machine_name": machine_name, + "machine_name": BasicConfig.machine_name, "type": "slave", "devices_config": devices_config_copy, "registry_config": lab_registry.obtain_registry_device_info() diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 0ff03a68..7e032064 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -1,3 +1,4 @@ +import json import threading import time import traceback @@ -13,15 +14,17 @@ from rclpy.action import ActionServer from rclpy.action.server import ServerGoalHandle from rclpy.client import Client from rclpy.callback_groups import ReentrantCallbackGroup +from rclpy.service import Service from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type from unilabos.ros.msgs.message_converter import ( convert_to_ros_msg, convert_from_ros_msg, convert_from_ros_msg_with_mapping, - convert_to_ros_msg_with_mapping, + convert_to_ros_msg_with_mapping, ros_action_to_json_schema, ) -from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore +from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \ + SerialCommand # type: ignore from unilabos_msgs.msg import Resource # type: ignore from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker @@ -29,7 +32,7 @@ from unilabos.ros.x.rclpyx import get_event_loop from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.utils.async_util import run_async_func from unilabos.utils.log import info, debug, warning, error, critical, logger -from unilabos.utils.type_check import get_type_class +from unilabos.utils.type_check import get_type_class, TypeEncoder T = TypeVar("T") @@ -44,19 +47,17 @@ class ROSLoggerAdapter: @property def identifier(self): - return f"{self.namespace}/{self.node_name}" + return f"{self.namespace}" - def __init__(self, ros_logger, node_name, namespace): + def __init__(self, ros_logger, namespace): """ 初始化日志适配器 Args: ros_logger: ROS2日志记录器 - node_name: 节点名称 namespace: 命名空间 """ self.ros_logger = ros_logger - self.node_name = node_name self.namespace = namespace self.level_2_logger_func = { "info": info, @@ -258,9 +259,9 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.lab_logger().critical("资源跟踪器未初始化,请检查") # 创建自定义日志记录器 - self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.node_name, self.namespace) + self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.namespace) - self._action_servers = {} + self._action_servers: Dict[str, ActionServer] = {} self._property_publishers = {} self._status_types = status_types self._action_value_mappings = action_value_mappings @@ -284,7 +285,7 @@ class BaseROS2DeviceNode(Node, Generic[T]): self.create_ros_action_server(action_name, action_value_mapping) # 创建线程池执行器 - self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1)) + self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}") # 创建资源管理客户端 self._resource_clients: Dict[str, Client] = { @@ -295,6 +296,18 @@ class BaseROS2DeviceNode(Node, Generic[T]): "resource_list": self.create_client(ResourceList, "/resources/list"), } + def query_host_name_cb(req, res): + self.register_device() + self.lab_logger().info("Host要求重新注册当前节点") + res.response = "" + return res + + self._service_server: Dict[str, Service] = { + "query_host_name": self.create_service( + SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group + ), + } + # 向全局在线设备注册表添加设备信息 self.register_device() rclpy.get_global_executor().add_node(self) @@ -318,6 +331,31 @@ class BaseROS2DeviceNode(Node, Generic[T]): ) # 加入全局注册表 registered_devices[self.device_id] = device_info + from unilabos.config.config import BasicConfig + if not BasicConfig.is_host_mode: + sclient = self.create_client(SerialCommand, "/node_info_update") + # 启动线程执行发送任务 + threading.Thread( + target=self.send_slave_node_info, + args=(sclient,), + daemon=True, + name=f"ROSDevice{self.device_id}_send_slave_node_info" + ).start() + + def send_slave_node_info(self, sclient): + sclient.wait_for_service() + request = SerialCommand.Request() + from unilabos.config.config import BasicConfig + request.command = json.dumps({ + "SYNC_SLAVE_NODE_INFO": { + "machine_name": BasicConfig.machine_name, + "type": "slave", + "edge_device_id": self.device_id + }}, ensure_ascii=False, cls=TypeEncoder) + + # 发送异步请求并等待结果 + future = sclient.call_async(request) + response = future.result() def lab_logger(self): """ diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 2c114412..5a739773 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -27,6 +27,7 @@ from unilabos.ros.msgs.message_converter import ( ) from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.presets.controller_node import ControllerNode +from unilabos.utils.type_check import TypeEncoder class HostNode(BaseROS2DeviceNode): @@ -98,7 +99,7 @@ class HostNode(BaseROS2DeviceNode): # 创建设备、动作客户端和目标存储 self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 - self.device_machine_names: Dict[str, str] = {} # 存储设备ID到机器名称的映射 + self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射 self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例 self._action_value_mappings: Dict[str, Dict] = ( {} @@ -160,6 +161,13 @@ class HostNode(BaseROS2DeviceNode): self.lab_logger().info("[Host Node] Host node initialized.") HostNode._ready_event.set() + def _send_re_register(self, sclient): + sclient.wait_for_service() + request = SerialCommand.Request() + request.command = "" + future = sclient.call_async(request) + response = future.result() + def _discover_devices(self) -> None: """ 发现网络中的设备 @@ -176,23 +184,37 @@ class HostNode(BaseROS2DeviceNode): current_devices = set() for device_id, namespace in nodes_and_names: - if not namespace.startswith("/devices"): + if not namespace.startswith("/devices/"): continue - + edge_device_id = namespace[9:] # 将设备添加到当前设备集合 - device_key = f"{namespace}/{device_id}" + device_key = f"{namespace}/{edge_device_id}" # namespace已经包含device_id了,这里复写一遍 current_devices.add(device_key) # 如果是新设备,记录并创建ActionClient - if device_id not in self.devices_names: + if edge_device_id not in self.devices_names: self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}") - self.devices_names[device_id] = namespace + self.devices_names[edge_device_id] = namespace self._create_action_clients_for_device(device_id, namespace) self._online_devices.add(device_key) + sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name") + threading.Thread( + target=self._send_re_register, + args=(sclient,), + daemon=True, + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + ).start() elif device_key not in self._online_devices: # 设备重新上线 self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}") self._online_devices.add(device_key) + sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name") + threading.Thread( + target=self._send_re_register, + args=(sclient,), + daemon=True, + name=f"ROSDevice{self.device_id}_query_host_name_{namespace}" + ).start() # 检测离线设备 offline_devices = self._online_devices - current_devices @@ -234,17 +256,22 @@ class HostNode(BaseROS2DeviceNode): self._action_clients[action_id] = ActionClient( self, action_type, action_id, callback_group=self.callback_group ) - self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") + self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}") + action_name = action_id[len(namespace) + 1:] + edge_device_id = namespace[9:] from unilabos.app.mq import mqtt_client - info_with_schema = ros_action_to_json_schema(action_type) - mqtt_client.publish_actions(action_id, info_with_schema) + mqtt_client.publish_actions(action_name, { + "device_id": edge_device_id, + "action_name": action_name, + "schema": info_with_schema, + }) except Exception as e: self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}") def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: """ - 根据配置初始化设备 + 根据配置初始化设备, 此函数根据提供的设备配置动态导入适当的设备类并创建其实例。 同时为设备的动作值映射设置动作客户端。 @@ -260,7 +287,7 @@ class HostNode(BaseROS2DeviceNode): if d is None: return # noinspection PyProtectedMember - self.devices_names[device_id] = d._ros_node.namespace + self.devices_names[device_id] = d._ros_node.namespace # 这里不涉及二级device_id self.device_machine_names[device_id] = "本地" self.devices_instances[device_id] = d # noinspection PyProtectedMember @@ -269,14 +296,17 @@ class HostNode(BaseROS2DeviceNode): if action_id not in self._action_clients: action_type = action_value_mapping["type"] self._action_clients[action_id] = ActionClient(self, action_type, action_id) - self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") + self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的 from unilabos.app.mq import mqtt_client - info_with_schema = ros_action_to_json_schema(action_type) - mqtt_client.publish_actions(action_id, info_with_schema) + mqtt_client.publish_actions(action_name, { + "device_id": device_id, + "action_name": action_name, + "schema": info_with_schema, + }) else: self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.") - device_key = f"{self.devices_names[device_id]}/{device_id}" + device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id # 添加到在线设备列表 self._online_devices.add(device_key) @@ -298,8 +328,8 @@ class HostNode(BaseROS2DeviceNode): # 解析设备名和属性名 parts = topic.split("/") - if len(parts) >= 4: - device_id = parts[-2] + if len(parts) >= 4: # 可能有ProtocolNode,创建更长的设备 + device_id = "/".join(parts[2:-1]) property_name = parts[-1] # 初始化设备状态字典 @@ -526,21 +556,19 @@ class HostNode(BaseROS2DeviceNode): from unilabos.app.mq import mqtt_client info = json.loads(request.command) - machine_name = info["machine_name"] - devices_config = info["devices_config"] - registry_config = info["registry_config"] - - # 更新设备机器名称映射 - for device_id in devices_config.keys(): - self.device_machine_names[device_id] = machine_name - self.lab_logger().debug(f"[Host Node] Updated machine name for device {device_id}: {machine_name}") - - for device_config in registry_config: - mqtt_client.publish_registry(device_config["id"], device_config) - self.lab_logger().info(f"[Host Node] Node info update: {info}") + if "SYNC_SLAVE_NODE_INFO" in info: + info = info["SYNC_SLAVE_NODE_INFO"] + machine_name = info["machine_name"] + edge_device_id = info["edge_device_id"] + self.device_machine_names[edge_device_id] = machine_name + else: + registry_config = info["registry_config"] + for device_config in registry_config: + mqtt_client.publish_registry(device_config["id"], device_config) + self.lab_logger().debug(f"[Host Node] Node info update: {info}") response.response = "OK" except Exception as e: - self.lab_logger().error(f"[Host Node] Error updating node info: {str(e)}") + self.lab_logger().error(f"[Host Node] Error updating node info: {e.args}") response.response = "ERROR" return response