diff --git a/unilabos/app/backend.py b/unilabos/app/backend.py index d43b9544..b2bc0af2 100644 --- a/unilabos/app/backend.py +++ b/unilabos/app/backend.py @@ -13,7 +13,7 @@ def start_backend( graph=None, controllers_config: dict = {}, bridges=[], - without_host: bool = False, + is_slave: bool = False, visual: str = "None", resources_mesh_config: dict = {}, **kwargs, @@ -32,7 +32,7 @@ def start_backend( raise ValueError(f"Unsupported backend: {backend}") backend_thread = threading.Thread( - target=main if not without_host else slave, + target=main if not is_slave else slave, args=( devices_config, resources_config, diff --git a/unilabos/app/register.py b/unilabos/app/register.py index f456183d..633df98f 100644 --- a/unilabos/app/register.py +++ b/unilabos/app/register.py @@ -1,11 +1,12 @@ import json import time +from typing import Optional, Tuple, Dict, Any from unilabos.utils.log import logger from unilabos.utils.type_check import TypeEncoder -def register_devices_and_resources(lab_registry): +def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]: """ 注册设备和资源到服务器(仅支持HTTP) """ @@ -28,6 +29,8 @@ def register_devices_and_resources(lab_registry): resources_to_register[resource_info["id"]] = resource_info logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}") + if gather_only: + return devices_to_register, resources_to_register # 注册设备 if devices_to_register: try: diff --git a/unilabos/ros/main_slave_run.py b/unilabos/ros/main_slave_run.py index d9ad3682..1ded6da1 100644 --- a/unilabos/ros/main_slave_run.py +++ b/unilabos/ros/main_slave_run.py @@ -6,11 +6,12 @@ from typing import Optional, Dict, Any, List import rclpy from unilabos_msgs.srv._serial_command import SerialCommand_Response +from unilabos.app.register import register_devices_and_resources from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher from unilabos_msgs.srv import SerialCommand # type: ignore -from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor +from rclpy.executors import MultiThreadedExecutor from rclpy.node import Node from rclpy.timer import Timer @@ -108,66 +109,51 @@ def slave( rclpy_init_args: List[str] = ["--log-level", "debug"], ) -> None: """从节点函数""" + # 1. 初始化 ROS2 if not rclpy.ok(): rclpy.init(args=rclpy_init_args) executor = rclpy.__executor if not executor: executor = rclpy.__executor = MultiThreadedExecutor() - devices_instances = {} - for device_config in devices_config.root_nodes: - device_id = device_config.res_content.id - if device_config.res_content.type != "device": - d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) - devices_instances[device_id] = d - # 默认初始化 - # if d is not None and isinstance(d, Node): - # executor.add_node(d) - # else: - # print(f"Warning: Device {device_id} could not be initialized or is not a valid Node") - n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[]) - executor.add_node(n) - - if visual != "disable": - from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher - - resource_mesh_manager = ResourceMeshManager( - resources_mesh_config, - resources_config, # type: ignore FIXME - resource_tracker=DeviceNodeResourceTracker(), - device_id="resource_mesh_manager", - ) - joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker()) - - executor.add_node(resource_mesh_manager) - executor.add_node(joint_republisher) + # 1.5 启动 executor 线程 thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread") thread.start() + # 2. 创建 Slave Machine Node + n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[]) + executor.add_node(n) + + # 3. 向 Host 报送节点信息和物料,获取 UUID 映射 + uuid_mapping = {} if not BasicConfig.slave_no_host: + # 3.1 报送节点信息 sclient = n.create_client(SerialCommand, "/node_info_update") sclient.wait_for_service() + registry_config = {} + devices_to_register, resources_to_register = register_devices_and_resources(lab_registry, True) + registry_config.update(devices_to_register) + registry_config.update(resources_to_register) request = SerialCommand.Request() request.command = json.dumps( { "machine_name": BasicConfig.machine_name, "type": "slave", "devices_config": devices_config.dump(), - "registry_config": lab_registry.obtain_registry_device_info(), + "registry_config": registry_config, }, ensure_ascii=False, cls=TypeEncoder, ) - response = sclient.call_async(request).result() + sclient.call_async(request).result() logger.info(f"Slave node info updated.") - # 使用新的 c2s_update_resource_tree 服务 - rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree") - rclient.wait_for_service() - - # 序列化 ResourceTreeSet 为 JSON + # 3.2 报送物料树,获取 UUID 映射 if resources_config: + rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree") + rclient.wait_for_service() + request = SerialCommand.Request() request.command = json.dumps( { @@ -180,35 +166,61 @@ def slave( }, ensure_ascii=False, ) - tree_response: SerialCommand_Response = rclient.call_async(request).result() + tree_response: SerialCommand_Response = rclient.call(request) uuid_mapping = json.loads(tree_response.response) - # 创建反向映射:new_uuid -> old_uuid - reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()} - for node in resources_config.root_nodes: - if node.res_content.type == "device": - for sub_node in node.children: - # 只有二级子设备 - if sub_node.res_content.type != "device": - device_tracker = devices_instances[node.res_content.id].resource_tracker - # sub_node.res_content.uuid 已经是新UUID,需要用旧UUID去查找 - old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid) - if old_uuid: - # 找到旧UUID,使用UUID查找 - resource_instance = device_tracker.figure_resource({"uuid": old_uuid}) - else: - # 未找到旧UUID,使用name查找 - resource_instance = device_tracker.figure_resource({"name": sub_node.res_content.name}) - device_tracker.loop_update_uuid(resource_instance, uuid_mapping) + logger.info(f"Slave resource tree added. UUID mapping: {len(uuid_mapping)} nodes") + + # 3.3 使用 UUID 映射更新 resources_config 的 UUID(参考 client.py 逻辑) + old_uuids = {node.res_content.uuid: node for node in resources_config.all_nodes} + for old_uuid, node in old_uuids.items(): + if old_uuid in uuid_mapping: + new_uuid = uuid_mapping[old_uuid] + node.res_content.uuid = new_uuid + # 更新所有子节点的 parent_uuid + for child in node.children: + child.res_content.parent_uuid = new_uuid else: - logger.error("Slave模式不允许新增非设备节点下的物料") - continue - if tree_response: - logger.info(f"Slave resource tree added. Response: {tree_response.response}") - else: - logger.warning("Slave resource tree add response is None") + logger.warning(f"资源UUID未更新: {old_uuid}") else: logger.info("No resources to add.") + # 4. 初始化所有设备实例(此时 resources_config 的 UUID 已更新) + devices_instances = {} + for device_config in devices_config.root_nodes: + device_id = device_config.res_content.id + if device_config.res_content.type == "device": + d = initialize_device_from_dict(device_id, device_config.get_nested_dict()) + if d is not None: + devices_instances[device_id] = d + logger.info(f"Device {device_id} initialized.") + else: + logger.warning(f"Device {device_id} initialization failed.") + + # 5. 如果启用可视化,创建可视化相关节点 + if visual != "disable": + from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher + + # 将 ResourceTreeSet 转换为 list 用于 visual 组件 + resources_list = ( + [node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes] + if resources_config + else [] + ) + resource_mesh_manager = ResourceMeshManager( + resources_mesh_config, + resources_list, + resource_tracker=DeviceNodeResourceTracker(), + device_id="resource_mesh_manager", + ) + joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker()) + lh_joint_pub = LiquidHandlerJointPublisher( + resources_config=resources_list, resource_tracker=DeviceNodeResourceTracker() + ) + executor.add_node(resource_mesh_manager) + executor.add_node(joint_republisher) + executor.add_node(lh_joint_pub) + + # 7. 保持运行 while True: time.sleep(1) diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 346cf9c2..5c43a5d0 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -878,11 +878,10 @@ class HostNode(BaseROS2DeviceNode): success = False uuid_mapping = {} if len(self.bridges) > 0: - from unilabos.app.web.client import HTTPClient + from unilabos.app.web.client import HTTPClient, http_client - client: HTTPClient = self.bridges[-1] resource_start_time = time.time() - uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add) + uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add) success = True resource_end_time = time.time() self.lab_logger().info( @@ -990,9 +989,10 @@ class HostNode(BaseROS2DeviceNode): """ 更新节点信息回调 """ - self.lab_logger().info(f"[Host Node] Node info update request received: {request}") + # self.lab_logger().info(f"[Host Node] Node info update request received: {request}") try: from unilabos.app.communication import get_communication_client + from unilabos.app.web.client import HTTPClient, http_client info = json.loads(request.command) if "SYNC_SLAVE_NODE_INFO" in info: @@ -1001,10 +1001,10 @@ class HostNode(BaseROS2DeviceNode): edge_device_id = info["edge_device_id"] self.device_machine_names[edge_device_id] = machine_name else: - comm_client = get_communication_client() - registry_config = info["registry_config"] - for device_config in registry_config: - comm_client.publish_registry(device_config["id"], device_config) + devices_config = info.pop("devices_config") + registry_config = info.pop("registry_config") + if registry_config: + http_client.resource_registry({"resources": registry_config}) self.lab_logger().debug(f"[Host Node] Node info update: {info}") response.response = "OK" except Exception as e: @@ -1030,10 +1030,9 @@ class HostNode(BaseROS2DeviceNode): success = False if len(self.bridges) > 0: # 边的提交待定 - from unilabos.app.web.client import HTTPClient + from unilabos.app.web.client import HTTPClient, http_client - client: HTTPClient = self.bridges[-1] - r = client.resource_add(add_schema(resources)) + r = http_client.resource_add(add_schema(resources)) success = bool(r) response.success = success diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 085ce028..ce23a5be 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -1007,7 +1007,7 @@ class DeviceNodeResourceTracker(object): current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid") if current_uuid and current_uuid in self.uuid_to_resources: self.uuid_to_resources.pop(current_uuid) - logger.debug(f"移除资源UUID映射: {current_uuid} -> {res}") + logger.trace(f"移除资源UUID映射: {current_uuid} -> {res}") return 1 return 0 diff --git a/unilabos/utils/log.py b/unilabos/utils/log.py index 74442a62..3894233b 100644 --- a/unilabos/utils/log.py +++ b/unilabos/utils/log.py @@ -191,7 +191,8 @@ def configure_logger(loglevel=None): # 添加处理器到根日志记录器 root_logger.addHandler(console_handler) - + logging.getLogger("asyncio").setLevel(logging.INFO) + logging.getLogger("urllib3").setLevel(logging.INFO) # 配置日志系统 configure_logger()