Files
Uni-Lab-OS/unilabos/device_mesh/resource_visalization.py
q434343 f3637d4043 Device visualization (#22)
* add 3d visualization

* 完成在main中启动设备可视化

完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model

添加物料模型管理类,遍历物料与resource_model,完成TF数据收集

* 完成TF发布

* 修改模型方向,在yaml中添加变换属性

* 添加物料tf变化时,发送topic到前端

另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题

* 添加关节发布节点与物料可视化节点进入unilab

* 使用json启动plr与3D模型仿真

* 完成启动OT并联动rviz

* add 3d visualization

* 完成在main中启动设备可视化

完成在main中启动设备可视化,并输出物料ID:mesh的对应关系resource_model

添加物料模型管理类,遍历物料与resource_model,完成TF数据收集

* 完成TF发布

* 修改模型方向,在yaml中添加变换属性

* 添加物料tf变化时,发送topic到前端

另外修改了物料初始化的方法,防止在tf还未发布时提前建立物料模型与发布话题

* 添加关节发布节点与物料可视化节点进入unilab

* 使用json启动plr与3D模型仿真

* 完成启动OT并联动rviz

* 修复rviz位置问题,

修复rviz位置问题,
在无tf变动时减缓发送频率
在backend中添加物料跟随方法

* fix: running logic

* fix: running logic

* fix: missing ot

* 在main中直接初始化republisher和物料的mesh节点

* 将joint_republisher和resource_mesh_manager添加进 main_slave_run.py中

* 编写mesh添加action

* add action

* fix

* fix: browser on rviz

* fix: cloud bridge error fallback to local

* fix: salve auto run rviz

* 初始化两个plate

---------

Co-authored-by: zhangshixiang <@zhangshixiang>
Co-authored-by: wznln <18435084+Xuwznln@users.noreply.github.com>
2025-05-07 02:12:29 +08:00

174 lines
7.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from pathlib import Path
from launch import LaunchService
from launch import LaunchDescription
from launch_ros.actions import Node as nd
import xacro
from lxml import etree
from unilabos.registry.registry import lab_registry
class ResourceVisualization:
def __init__(self, device: dict, resource: dict, enable_rviz: bool = True):
"""初始化资源可视化类
该类用于将设备和资源的3D模型可视化展示。通过解析设备和资源的配置信息,
从注册表中获取对应的3D模型文件,并使用ROS2和RViz进行可视化。
Args:
device (dict): 设备配置字典,包含设备的类型、位置等信息
resource (dict): 资源配置字典,包含资源的类型、位置等信息
registry (dict): 注册表字典,包含设备和资源类型的注册信息
enable_rviz (bool, optional): 是否启用RViz可视化. Defaults to True.
"""
self.launch_service = LaunchService()
self.launch_description = LaunchDescription()
self.resource_dict = resource
self.resource_model = {}
self.resource_type = ['deck', 'plate', 'container']
self.mesh_path = Path(__file__).parent.absolute()
self.enable_rviz = enable_rviz
registry = lab_registry
self.srdf_str = '''
<?xml version="1.0" encoding="UTF-8"?>
<robot name="minimal">
</robot>
'''
self.robot_state_str= '''<?xml version="1.0" ?>
<robot xmlns:xacro="http://ros.org/wiki/xacro" name="full_dev">
<link name="world"/>
</robot>
'''
self.root = etree.fromstring(self.robot_state_str)
xacro_uri = self.root.nsmap["xacro"]
# 遍历设备节点
for node in device.values():
if node['type'] == 'device' and node['class'] != '':
device_class = node['class']
# 检查设备类型是否在注册表中
if device_class not in registry.device_type_registry.keys():
raise ValueError(f"设备类型 {device_class} 未在注册表中注册")
elif node['type'] in self.resource_type:
# print(registry.resource_type_registry)
resource_class = node['class']
if resource_class not in registry.resource_type_registry.keys():
raise ValueError(f"资源类型 {resource_class} 未在注册表中注册")
elif "model" in registry.resource_type_registry[resource_class].keys():
model_config = registry.resource_type_registry[resource_class]['model']
if model_config['type'] == 'resource':
self.resource_model[node['id']] = {
'mesh': f"{str(self.mesh_path)}/resources/{model_config['mesh']}",
'mesh_tf': model_config['mesh_tf']}
if 'children_mesh' in model_config:
if model_config['children_mesh'] is not None:
self.resource_model[f"{node['id']}_"] = {
'mesh': f"{str(self.mesh_path)}/resources/{model_config['children_mesh']}",
'mesh_tf': model_config['children_mesh_tf']
}
elif model_config['type'] == 'device':
new_include = etree.SubElement(self.root, f"{{{xacro_uri}}}include")
new_include.set("filename", f"{str(self.mesh_path)}/devices/{model_config['mesh']}/macro_device.xacro")
new_dev = etree.SubElement(self.root, f"{{{xacro_uri}}}{model_config['mesh']}")
new_dev.set("parent_link", "world")
new_dev.set("mesh_path", str(self.mesh_path))
new_dev.set("device_name", node["id"]+"_")
new_dev.set("station_name", node["parent"]+'_')
new_dev.set("x",str(float(node["position"]["x"])/1000))
new_dev.set("y",str(float(node["position"]["y"])/1000))
new_dev.set("z",str(float(node["position"]["z"])/1000))
if "rotation" in node["config"]:
new_dev.set("r",str(float(node["config"]["rotation"]["z"])/1000))
else:
print("错误的注册表类型!")
re = etree.tostring(self.root, encoding="unicode")
doc = xacro.parse(re)
xacro.process_doc(doc)
self.urdf_str = doc.toxml()
def create_launch_description(self, urdf_str: str) -> LaunchDescription:
"""
创建launch描述包含robot_state_publisher和move_group节点
Args:
urdf_str: URDF文本
Returns:
LaunchDescription: launch描述对象
"""
# 解析URDF文件
robot_description = urdf_str
# 创建robot_state_publisher节点
robot_state_publisher = nd(
package='robot_state_publisher',
executable='robot_state_publisher',
name='robot_state_publisher',
output='screen',
parameters=[{
'robot_description': robot_description,
'use_sim_time': False
}]
)
# joint_state_publisher_node = nd(
# package='joint_state_publisher_gui', # 或 joint_state_publisher
# executable='joint_state_publisher_gui',
# name='joint_state_publisher',
# output='screen'
# )
# 创建move_group节点
move_group = nd(
package='moveit_ros_move_group',
executable='move_group',
output='screen',
parameters=[{
'robot_description': robot_description,
'robot_description_semantic': self.srdf_str,
'capabilities': '',
'disable_capabilities': '',
'monitor_dynamics': False,
'publish_monitored_planning_scene': True,
'publish_robot_description_semantic': True,
'publish_planning_scene': True,
'publish_geometry_updates': True,
'publish_state_updates': True,
'publish_transforms_updates': True,
}]
)
# 将节点添加到launch描述中
self.launch_description.add_action(robot_state_publisher)
# self.launch_description.add_action(joint_state_publisher_node)
self.launch_description.add_action(move_group)
# 如果启用RViz,添加RViz节点
if self.enable_rviz:
rviz_node = nd(
package='rviz2',
executable='rviz2',
name='rviz2',
arguments=['-d', f"{str(self.mesh_path)}/view_robot.rviz"],
output='screen'
)
self.launch_description.add_action(rviz_node)
return self.launch_description
def start(self) -> None:
"""
启动可视化服务
Args:
urdf_str: URDF文件路径
"""
launch_description = self.create_launch_description(self.urdf_str)
self.launch_service.include_launch_description(launch_description)
self.launch_service.run()