import json import os from pathlib import Path import re import yaml from launch import LaunchService from launch import LaunchDescription from launch_ros.actions import Node as nd import xacro from lxml import etree from launch_param_builder import load_yaml from launch_ros.parameter_descriptions import ParameterFile from unilabos.registry.registry import lab_registry from ament_index_python.packages import get_package_share_directory def get_pattern_matches(folder, pattern): """Given all the files in the folder, find those that match the pattern. If there are groups defined, the groups are returned. Otherwise the path to the matches are returned. """ matches = [] if not folder.exists(): return matches for child in folder.iterdir(): if not child.is_file(): continue m = pattern.search(child.name) if m: groups = m.groups() if groups: matches.append(groups[0]) else: matches.append(child) return matches class ResourceVisualization: def __init__(self, device: dict, resource: dict, enable_rviz: bool = True): """初始化资源可视化类 该类用于将设备和资源的3D模型可视化展示。通过解析设备和资源的配置信息, 从注册表中获取对应的3D模型文件,并使用ROS2和RViz进行可视化。 Args: device (dict): 设备配置字典,包含设备的类型、位置等信息 resource (dict): 资源配置字典,包含资源的类型、位置等信息 registry (dict): 注册表字典,包含设备和资源类型的注册信息 enable_rviz (bool, optional): 是否启用RViz可视化. Defaults to True. """ self.launch_service = LaunchService() self.launch_description = LaunchDescription() self.resource_dict = resource self.resource_model = {} self.resource_type = ['deck', 'plate', 'container'] self.mesh_path = Path(__file__).parent.absolute() self.enable_rviz = enable_rviz registry = lab_registry self.srdf_str = ''' ''' self.robot_state_str= ''' ''' self.root = etree.fromstring(self.robot_state_str) self.root_srdf = etree.fromstring(self.srdf_str) xacro_uri = self.root.nsmap["xacro"] self.moveit_nodes = {} self.moveit_nodes_kinematics = {} self.moveit_controllers_yaml = { "moveit_controller_manager": "moveit_simple_controller_manager/MoveItSimpleControllerManager", "moveit_simple_controller_manager": { "controller_names": [] } } self.ros2_controllers_yaml = { "controller_manager": { "ros__parameters": { "update_rate": 100, "joint_state_broadcaster": { "type": "joint_state_broadcaster/JointStateBroadcaster", } } } } # 遍历设备节点 for node in device.values(): if node['type'] in self.resource_type or (node['type'] == 'device' and node['class'] != ''): model_config = {} if node['type'] in self.resource_type: resource_class = node['class'] if resource_class not in registry.resource_type_registry.keys(): raise ValueError(f"资源类型 {resource_class} 未在注册表中注册") elif "model" in registry.resource_type_registry[resource_class].keys(): model_config = registry.resource_type_registry[resource_class]['model'] elif node['type'] == 'device' and node['class'] != '': device_class = node['class'] if device_class not in registry.device_type_registry.keys(): raise ValueError(f"设备类型 {device_class} 未在注册表中注册") elif "model" in registry.device_type_registry[device_class].keys(): model_config = registry.device_type_registry[device_class]['model'] if model_config: if model_config['type'] == 'resource': self.resource_model[node['id']] = { 'mesh': f"{str(self.mesh_path)}/resources/{model_config['mesh']}", 'mesh_tf': model_config['mesh_tf']} if 'children_mesh' in model_config: if model_config['children_mesh'] is not None: self.resource_model[f"{node['id']}_"] = { 'mesh': f"{str(self.mesh_path)}/resources/{model_config['children_mesh']}", 'mesh_tf': model_config['children_mesh_tf'] } elif model_config['type'] == 'device': new_include = etree.SubElement(self.root, f"{{{xacro_uri}}}include") new_include.set("filename", f"{str(self.mesh_path)}/devices/{model_config['mesh']}/macro_device.xacro") new_dev = etree.SubElement(self.root, f"{{{xacro_uri}}}{model_config['mesh']}") new_dev.set("parent_link", "world") new_dev.set("mesh_path", str(self.mesh_path)) new_dev.set("device_name", node["id"]+"_") # if node["parent"] is not None: # new_dev.set("station_name", node["parent"]+'_') print('o'*20) node["parent"] node["id"] print('o'*20) new_dev.set("x",str(float(node["position"]["x"])/1000)) new_dev.set("y",str(float(node["position"]["y"])/1000)) new_dev.set("z",str(float(node["position"]["z"])/1000)) if "rotation" in node["config"]: new_dev.set("rx",str(float(node["config"]["rotation"]["x"]))) new_dev.set("ry",str(float(node["config"]["rotation"]["y"]))) new_dev.set("r",str(float(node["config"]["rotation"]["z"]))) if "device_config" in node["config"]: for key, value in node["config"]["device_config"].items(): new_dev.set(key, str(float(value))) # 添加ros2_controller if node['class'].startswith('moveit.'): new_include_controller = etree.SubElement(self.root, f"{{{xacro_uri}}}include") new_include_controller.set("filename", f"{str(self.mesh_path)}/devices/{model_config['mesh']}/config/macro.ros2_control.xacro") new_controller = etree.SubElement(self.root, f"{{{xacro_uri}}}{model_config['mesh']}_ros2_control") new_controller.set("device_name", node["id"]+"_") new_controller.set("mesh_path", str(self.mesh_path)) # 添加moveit的srdf new_include_srdf = etree.SubElement(self.root_srdf, f"{{{xacro_uri}}}include") new_include_srdf.set("filename", f"{str(self.mesh_path)}/devices/{model_config['mesh']}/config/macro.srdf.xacro") new_srdf = etree.SubElement(self.root_srdf, f"{{{xacro_uri}}}{model_config['mesh']}_srdf") new_srdf.set("device_name", node["id"]+"_") self.moveit_nodes[node["id"]] = model_config['mesh'] else: print("错误的注册表类型!") re = etree.tostring(self.root, encoding="unicode") doc = xacro.parse(re) xacro.process_doc(doc) self.urdf_str = doc.toxml() re_srdf = etree.tostring(self.root_srdf, encoding="unicode") doc_srdf = xacro.parse(re_srdf) xacro.process_doc(doc_srdf) self.urdf_str_srdf = doc_srdf.toxml() if self.moveit_nodes: self.moveit_init() def moveit_init(self): for name, config in self.moveit_nodes.items(): controller_dict = yaml.safe_load(open(f"{str(self.mesh_path)}/devices/{config}/config/ros2_controllers.yaml", "r")) moveit_dict = yaml.safe_load(open(f"{str(self.mesh_path)}/devices/{config}/config/moveit_controllers.yaml", "r")) kinematics_dict = yaml.safe_load(open(f"{str(self.mesh_path)}/devices/{config}/config/kinematics.yaml", "r")) for key_kinematics, value_kinematics in kinematics_dict.items(): self.moveit_nodes_kinematics[f'{name}_{key_kinematics}'] = value_kinematics for key, value in controller_dict['controller_manager']['ros__parameters'].items(): if key == 'update_rate' or key == 'joint_state_broadcaster': continue self.ros2_controllers_yaml['controller_manager']['ros__parameters'][f"{name}_{key}"] = value controller_dict[key]['ros__parameters']['joints'] = [f"{name}_{joint}" for joint in controller_dict[key]['ros__parameters']['joints']] self.ros2_controllers_yaml[f"{name}_{key}"] = controller_dict[key] for controller_name in moveit_dict['moveit_simple_controller_manager']['controller_names']: self.moveit_controllers_yaml['moveit_simple_controller_manager']['controller_names'].append(f"{name}_{controller_name}") moveit_dict['moveit_simple_controller_manager'][controller_name]['joints'] = [f"{name}_{joint}" for joint in moveit_dict['moveit_simple_controller_manager'][controller_name]['joints']] self.moveit_controllers_yaml['moveit_simple_controller_manager'][f"{name}_{controller_name}"] = moveit_dict['moveit_simple_controller_manager'][controller_name] def create_launch_description(self) -> LaunchDescription: """ 创建launch描述,包含robot_state_publisher和move_group节点 Args: urdf_str: URDF文本 Returns: LaunchDescription: launch描述对象 """ moveit_configs_utils_path = Path(get_package_share_directory("moveit_configs_utils")) default_folder = moveit_configs_utils_path / "default_configs" planning_pattern = re.compile("^(.*)_planning.yaml$") pipelines = [] for pipeline in get_pattern_matches(default_folder, planning_pattern): if pipeline not in pipelines: pipelines.append(pipeline) if "ompl" in pipelines: default_planning_pipeline = "ompl" else: default_planning_pipeline = pipelines[0] planning_pipelines = { "planning_pipelines": pipelines, "default_planning_pipeline": default_planning_pipeline, } for pipeline in pipelines: planning_pipelines[pipeline] = load_yaml( default_folder / f"{pipeline}_planning.yaml" ) if "ompl" in planning_pipelines: ompl_config = planning_pipelines["ompl"] if "planner_configs" not in ompl_config: ompl_config.update(load_yaml(default_folder / "ompl_defaults.yaml")) yaml.safe_dump(self.ros2_controllers_yaml, open(f"{str(self.mesh_path)}/ros2_controllers.yaml", "w")) robot_description_planning = { "default_velocity_scaling_factor": 0.1, "default_acceleration_scaling_factor": 0.1, "cartesian_limits": { "max_trans_vel": 1.0, "max_trans_acc": 2.25, "max_trans_dec": -5.0, "max_rot_vel": 1.57 } } # 解析URDF文件 robot_description = self.urdf_str urdf_str_srdf = self.urdf_str_srdf kinematics_dict = self.moveit_nodes_kinematics if self.moveit_nodes: controllers = [] ros2_controllers = ParameterFile(f"{str(self.mesh_path)}/ros2_controllers.yaml", allow_substs=True) controllers.append( nd( package="controller_manager", executable="ros2_control_node", output='screen', parameters=[ {"robot_description": robot_description}, ros2_controllers, ] ) ) for controller in self.moveit_controllers_yaml['moveit_simple_controller_manager']['controller_names']: controllers.append( nd( package="controller_manager", executable="spawner", arguments=[f"{controller}", "--controller-manager", f"controller_manager"], output="screen", ) ) controllers.append( nd( package="controller_manager", executable="spawner", arguments=["joint_state_broadcaster", "--controller-manager", f"controller_manager"], output="screen", ) ) for i in controllers: self.launch_description.add_action(i) else: ros2_controllers = None # 创建robot_state_publisher节点 robot_state_publisher = nd( package='robot_state_publisher', executable='robot_state_publisher', name='robot_state_publisher', output='screen', parameters=[{ 'robot_description': robot_description, 'use_sim_time': False }, # kinematics_dict ] ) # 创建move_group节点 move_group = nd( package='moveit_ros_move_group', executable='move_group', output='screen', parameters=[{ 'allow_trajectory_execution': True, 'robot_description': robot_description, 'robot_description_semantic': urdf_str_srdf, 'robot_description_kinematics': kinematics_dict, 'capabilities': '', 'disable_capabilities': '', 'monitor_dynamics': False, 'publish_monitored_planning_scene': True, 'publish_robot_description_semantic': True, 'publish_planning_scene': True, 'publish_geometry_updates': True, 'publish_state_updates': True, 'publish_transforms_updates': True, # 'robot_description_planning': robot_description_planning, }, self.moveit_controllers_yaml, # ompl_planning_pipeline_config, robot_description_planning, planning_pipelines, ] ) # 将节点添加到launch描述中 self.launch_description.add_action(robot_state_publisher) # self.launch_description.add_action(joint_state_publisher_node) self.launch_description.add_action(move_group) # 如果启用RViz,添加RViz节点 if self.enable_rviz: rviz_node = nd( package='rviz2', executable='rviz2', name='rviz2', arguments=['-d', f"{str(self.mesh_path)}/view_robot.rviz"], output='screen', parameters=[ {'robot_description_kinematics': kinematics_dict, }, robot_description_planning, planning_pipelines, ] ) self.launch_description.add_action(rviz_node) return self.launch_description def start(self) -> None: """ 启动可视化服务 Args: urdf_str: URDF文件路径 """ launch_description = self.create_launch_description() # print('--------------------------------') # print(self.moveit_controllers_yaml) # print('--------------------------------') # print(self.urdf_str) # print('--------------------------------') self.launch_service.include_launch_description(launch_description) self.launch_service.run()