From b150821418669f4da3eb9a34f9907769156b169d Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Mon, 9 Jun 2025 04:32:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dmoveit=20=E5=A2=9E=E5=8A=A0po?= =?UTF-8?q?st=5Finit=E9=98=B6=E6=AE=B5=EF=BC=8C=E7=BB=99=E4=BA=88ros=5Fnod?= =?UTF-8?q?e=E5=8F=8D=E5=90=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- unilabos/devices/ros_dev/moveit_interface.py | 424 ++++++++----------- unilabos/registry/devices/moveit_config.yaml | 60 ++- unilabos/ros/nodes/base_device_node.py | 6 + 3 files changed, 229 insertions(+), 261 deletions(-) diff --git a/unilabos/devices/ros_dev/moveit_interface.py b/unilabos/devices/ros_dev/moveit_interface.py index b41e5a8..a265513 100644 --- a/unilabos/devices/ros_dev/moveit_interface.py +++ b/unilabos/devices/ros_dev/moveit_interface.py @@ -1,109 +1,68 @@ -from pathlib import Path -from threading import Thread +import json import time - -import rclpy -import rclpy.duration -import rclpy.time -from unilabos_msgs.action import SendCmd -from rclpy.action import ActionServer, ActionClient -from rclpy.action.server import ServerGoalHandle -from rclpy.callback_groups import ReentrantCallbackGroup -from moveit_msgs.msg import JointConstraint, Constraints from copy import deepcopy +from pathlib import Path + +from moveit_msgs.msg import JointConstraint, Constraints +from rclpy.action import ActionClient +from rclpy.callback_groups import ReentrantCallbackGroup +from tf2_ros import Buffer, TransformListener +from unilabos_msgs.action import SendCmd from unilabos.devices.ros_dev.moveit2 import MoveIt2 - -import numpy as np from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode -# from tf_transformations import quaternion_from_euler -from tf2_ros import Buffer, TransformListener -import json +class MoveitInterface: + _ros_node: BaseROS2DeviceNode + tf_buffer: Buffer + tf_listener: TransformListener -class MoveitInterface(BaseROS2DeviceNode): - def __init__(self, device_id, moveit_type, joint_poses, resource_tracker, rotation = None, device_config = None): - super().__init__( - driver_instance=self, - device_id=device_id, - status_types={}, - action_value_mappings={}, - hardware_interface={}, - print_publish=False, - resource_tracker=resource_tracker, + def __init__(self, moveit_type, joint_poses, rotation=None, device_config=None): + self.device_config = device_config + self.rotation = rotation + self.callback_group = ReentrantCallbackGroup() + self.data_config = json.load( + open( + f"{Path(__file__).parent.parent.parent.absolute()}/device_mesh/devices/{moveit_type}/config/move_group.json", + encoding="utf-8", + ) ) - self.callback_group = ReentrantCallbackGroup() - data_config = json.load(open(f"{Path(__file__).parent.parent.parent.absolute()}/device_mesh/devices/{moveit_type}/config/move_group.json", encoding="utf-8")) - - self.tf_buffer = Buffer() - self.tf_listener = TransformListener(self.tf_buffer, self) - - self.arm_move_flag = False - - self.move_option = ['pick', 'place', 'side_pick', 'side_place'] - self.joint_poses = joint_poses - self.cartesian_flag = False - - self.mesh_group = ['reactor','sample','beaker'] - + self.arm_move_flag = False + self.move_option = ["pick", "place", "side_pick", "side_place"] + self.joint_poses = joint_poses + self.cartesian_flag = False + self.mesh_group = ["reactor", "sample", "beaker"] self.moveit2 = {} - - self.resource_action = None self.resource_client = None self.resource_action_ok = False - - for move_group, config in data_config.items(): + def post_init(self, ros_node: BaseROS2DeviceNode): + self._ros_node = ros_node + self.tf_buffer = Buffer() + self.tf_listener = TransformListener(self.tf_buffer, self._ros_node) - base_link_name = f"{device_id}_{config['base_link_name']}" - end_effector_name = f"{device_id}_{config['end_effector_name']}" - joint_names = [f"{device_id}_{name}" for name in config['joint_names']] + for move_group, config in self.data_config.items(): + + base_link_name = f"{self._ros_node.device_id}_{config['base_link_name']}" + end_effector_name = f"{self._ros_node.device_id}_{config['end_effector_name']}" + joint_names = [f"{self._ros_node.device_id}_{name}" for name in config["joint_names"]] self.moveit2[f"{move_group}"] = MoveIt2( - node=self, + node=self._ros_node, joint_names=joint_names, base_link_name=base_link_name, end_effector_name=end_effector_name, - group_name=f"{device_id}_{move_group}", - callback_group=self.callback_group, - use_move_group_action= True, - ignore_new_calls_while_executing = True + group_name=f"{self._ros_node.device_id}_{move_group}", + callback_group=self._ros_node.callback_group, + use_move_group_action=True, + ignore_new_calls_while_executing=True, ) self.moveit2[f"{move_group}"].allowed_planning_time = 3.0 - - self.moveit_pose_server = ActionServer( - self, - SendCmd, - f'set_position', - execute_callback=self.callback, - callback_group=self.callback_group, - result_timeout=5000 - ) - - self.moveit_pick_server = ActionServer( - self, - SendCmd, - f'pick_and_place', - execute_callback=self.pick_and_place_callback, - callback_group=self.callback_group, - result_timeout=5000 - ) - - - self.moveit_joint_server = ActionServer( - self, - SendCmd, - f'set_status', - execute_callback=self.set_status_callback, - callback_group=self.callback_group, - result_timeout=5000 - ) - - self.create_timer(1, self.wait_for_resource_action,callback_group=ReentrantCallbackGroup()) + self._ros_node.create_timer(1, self.wait_for_resource_action, callback_group=self._ros_node.callback_group) def wait_for_resource_action(self): @@ -115,30 +74,24 @@ class MoveitInterface(BaseROS2DeviceNode): self.resource_client = ActionClient(self, SendCmd, self.resource_action) self.resource_action_ok = True while not self.resource_client.wait_for_server(timeout_sec=5.0): - self.get_logger().info('等待 TfUpdate 服务器...') - + self._ros_node.lab_logger().info("等待 TfUpdate 服务器...") def check_tf_update_actions(self): - topics = self.get_topic_names_and_types() - - + topics = self._ros_node.get_topic_names_and_types() for topic_item in topics: - topic_name, topic_types = topic_item - - if 'action_msgs/msg/GoalStatusArray' in topic_types: + if "action_msgs/msg/GoalStatusArray" in topic_types: # 删除 /_action/status 部分 - base_name = topic_name.replace('/_action/status', '') + base_name = topic_name.replace("/_action/status", "") # 检查最后一个部分是否为 tf_update - parts = base_name.split('/') - if parts and parts[-1] == 'tf_update': + parts = base_name.split("/") + if parts and parts[-1] == "tf_update": return base_name - - return None - - def callback(self,goal_handle: ServerGoalHandle): + return None + + def set_position(self, command): """使用moveit 移动到指定点 Args: command: A JSON-formatted string that includes quaternion, speed, position @@ -147,42 +100,33 @@ class MoveitInterface(BaseROS2DeviceNode): quaternion (list) : 点的姿态(四元数) [x,y,z,w] move_group (string) : The move group moveit will plan speed (float) : The speed of the movement, speed > 0 - retry (int) : Retry times when moveit plan fails + retry (int) : Retry times when moveit plan fails Returns: None """ result = SendCmd.Result() - - try: - cmd_str = str(goal_handle.request.command).replace('\'','\"') - cmd_dict = json.loads(cmd_str) - - self.moveit_task(**cmd_dict) - goal_handle.succeed() - result.success=True - - except Exception as e: - print(e) - goal_handle.abort() - result.success=False - + cmd_str = command.replace("'", '"') + cmd_dict = json.loads(cmd_str) + self.moveit_task(**cmd_dict) return result - def moveit_task(self,move_group,position,quaternion,speed=1,retry=10,cartesian=False, target_link = None ,offsets =[0,0,0]): + def moveit_task( + self, move_group, position, quaternion, speed=1, retry=10, cartesian=False, target_link=None, offsets=[0, 0, 0] + ): speed_ = float(max(0.1, min(speed, 1))) - self.moveit2[move_group].max_velocity = speed_ - self.moveit2[move_group].max_acceleration = speed_ + self.moveit2[move_group].max_velocity = speed_ + self.moveit2[move_group].max_acceleration = speed_ re_ = False - pose_result =[x + y for x, y in zip(position, offsets)] + pose_result = [x + y for x, y in zip(position, offsets)] # print(pose_result) - while retry > -1 and not re_ : + while retry > -1 and not re_: self.moveit2[move_group].move_to_pose( target_link=target_link, @@ -191,14 +135,14 @@ class MoveitInterface(BaseROS2DeviceNode): cartesian=cartesian, # cartesian_fraction_threshold=0.0, cartesian_max_step=0.01, - weight_position=1.0 + weight_position=1.0, ) - re_ = self.moveit2[move_group].wait_until_executed() + re_ = self.moveit2[move_group].wait_until_executed() retry += -1 return re_ - def moveit_joint_task(self, move_group, joint_positions, joint_names = None,speed=1,retry=10): + def moveit_joint_task(self, move_group, joint_positions, joint_names=None, speed=1, retry=10): re_ = False @@ -206,107 +150,123 @@ class MoveitInterface(BaseROS2DeviceNode): speed_ = float(max(0.1, min(speed, 1))) - self.moveit2[move_group].max_velocity = speed_ - self.moveit2[move_group].max_acceleration = speed_ - - while retry > -1 and not re_ : + self.moveit2[move_group].max_velocity = speed_ + self.moveit2[move_group].max_acceleration = speed_ - self.moveit2[move_group].move_to_configuration(joint_positions=joint_positions_,joint_names=joint_names) - re_ = self.moveit2[move_group].wait_until_executed() + while retry > -1 and not re_: + + self.moveit2[move_group].move_to_configuration(joint_positions=joint_positions_, joint_names=joint_names) + re_ = self.moveit2[move_group].wait_until_executed() retry += -1 print(self.moveit2[move_group].compute_fk(joint_positions)) return re_ - - def resource_manager(self,resource, parent_link): + def resource_manager(self, resource, parent_link): goal_msg = SendCmd.Goal() str_dict = {} str_dict[resource] = parent_link goal_msg.command = json.dumps(str_dict) + assert self.resource_client is not None self.resource_client.send_goal(goal_msg) return True - - - def pick_and_place_callback(self,goal_handle: ServerGoalHandle): - + def pick_and_place(self, command: str): """ - Using MoveIt to make the robotic arm pick or place materials to a target point. + Using MoveIt to make the robotic arm pick or place materials to a target point. - Args: - command: A JSON-formatted string that includes option, target, speed, lift_height, mt_height + Args: + command: A JSON-formatted string that includes option, target, speed, lift_height, mt_height - *option (string) : Action type: pick/place/side_pick/side_place - *move_group (string): The move group moveit will plan - *status(string) : Target pose - resource(string) : The target resource - x_distance (float) : The distance to the target in x direction(meters) - y_distance (float) : The distance to the target in y direction(meters) - lift_height (float) : The height at which the material should be lifted(meters) - retry (float) : Retry times when moveit plan fails - speed (float) : The speed of the movement, speed > 0 - Returns: - None + *option (string) : Action type: pick/place/side_pick/side_place + *move_group (string): The move group moveit will plan + *status(string) : Target pose + resource(string) : The target resource + x_distance (float) : The distance to the target in x direction(meters) + y_distance (float) : The distance to the target in y direction(meters) + lift_height (float) : The height at which the material should be lifted(meters) + retry (float) : Retry times when moveit plan fails + speed (float) : The speed of the movement, speed > 0 + Returns: + None """ result = SendCmd.Result() try: - cmd_str = str(goal_handle.request.command).replace('\'','\"') + cmd_str = str(command).replace("'", '"') cmd_dict = json.loads(cmd_str) if cmd_dict["option"] in self.move_option: option_index = self.move_option.index(cmd_dict["option"]) place_flag = option_index % 2 - + config = {} function_list = [] status = cmd_dict["status"] joint_positions_ = self.joint_poses[cmd_dict["move_group"]][status] - config.update({k: cmd_dict[k] for k in ["speed", "retry", 'move_group'] if k in cmd_dict}) - + config.update({k: cmd_dict[k] for k in ["speed", "retry", "move_group"] if k in cmd_dict}) # 夹取 if not place_flag: - if 'target' in cmd_dict.keys(): + if "target" in cmd_dict.keys(): function_list.append(lambda: self.resource_manager(cmd_dict["resource"], cmd_dict["target"])) else: - function_list.append(lambda: self.resource_manager(cmd_dict["resource"], self.moveit2[cmd_dict["move_group"]].end_effector_name)) + function_list.append( + lambda: self.resource_manager( + cmd_dict["resource"], self.moveit2[cmd_dict["move_group"]].end_effector_name + ) + ) else: - function_list.append(lambda: self.resource_manager(cmd_dict["resource"], 'world')) + function_list.append(lambda: self.resource_manager(cmd_dict["resource"], "world")) constraints = [] if "constraints" in cmd_dict.keys(): - + for i in range(len(cmd_dict["constraints"])): v = float(cmd_dict["constraints"][i]) if v > 0: - constraints.append(JointConstraint( - joint_name=self.moveit2[cmd_dict["move_group"]].joint_names[i], - position=joint_positions_[i], - tolerance_above=v, - tolerance_below=v, - weight=1.0 - )) + constraints.append( + JointConstraint( + joint_name=self.moveit2[cmd_dict["move_group"]].joint_names[i], + position=joint_positions_[i], + tolerance_above=v, + tolerance_below=v, + weight=1.0, + ) + ) - if "lift_height" in cmd_dict.keys(): - + retval = self.moveit2[cmd_dict["move_group"]].compute_fk(joint_positions_) pose = [retval.pose.position.x, retval.pose.position.y, retval.pose.position.z] - quaternion = [retval.pose.orientation.x, retval.pose.orientation.y, retval.pose.orientation.z, retval.pose.orientation.w] - - function_list = [lambda: self.moveit_task(position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z], quaternion=quaternion,**config, cartesian=self.cartesian_flag)] + function_list + quaternion = [ + retval.pose.orientation.x, + retval.pose.orientation.y, + retval.pose.orientation.z, + retval.pose.orientation.w, + ] + + function_list = [ + lambda: self.moveit_task( + position=[retval.pose.position.x, retval.pose.position.y, retval.pose.position.z], + quaternion=quaternion, + **config, + cartesian=self.cartesian_flag, + ) + ] + function_list pose[2] += float(cmd_dict["lift_height"]) - function_list.append(lambda: self.moveit_task(position=pose, quaternion=quaternion,**config, cartesian=self.cartesian_flag)) + function_list.append( + lambda: self.moveit_task( + position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag + ) + ) end_pose = pose - if "x_distance" in cmd_dict.keys() or "y_distance" in cmd_dict.keys(): if "x_distance" in cmd_dict.keys(): deep_pose = deepcopy(pose) @@ -314,27 +274,37 @@ class MoveitInterface(BaseROS2DeviceNode): elif "y_distance" in cmd_dict.keys(): deep_pose = deepcopy(pose) deep_pose[1] += float(cmd_dict["y_distance"]) - - function_list = [lambda: self.moveit_task(position=pose, quaternion=quaternion,**config, cartesian=self.cartesian_flag)] + function_list - function_list.append(lambda: self.moveit_task(position=deep_pose, quaternion=quaternion,**config, cartesian=self.cartesian_flag)) + + function_list = [ + lambda: self.moveit_task( + position=pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag + ) + ] + function_list + function_list.append( + lambda: self.moveit_task( + position=deep_pose, quaternion=quaternion, **config, cartesian=self.cartesian_flag + ) + ) end_pose = deep_pose retval_ik = self.moveit2[cmd_dict["move_group"]].compute_ik( - position=end_pose, - quat_xyzw=quaternion, - constraints=Constraints(joint_constraints=constraints)) - - - position_ = [retval_ik.position[retval_ik.name.index(i)] for i in self.moveit2[cmd_dict["move_group"]].joint_names] - - function_list = [lambda: self.moveit_joint_task( - joint_positions=position_, - joint_names=self.moveit2[cmd_dict["move_group"]].joint_names, - **config, - )] + function_list + position=end_pose, quat_xyzw=quaternion, constraints=Constraints(joint_constraints=constraints) + ) + position_ = [ + retval_ik.position[retval_ik.name.index(i)] + for i in self.moveit2[cmd_dict["move_group"]].joint_names + ] + function_list = [ + lambda: self.moveit_joint_task( + joint_positions=position_, + joint_names=self.moveit2[cmd_dict["move_group"]].joint_names, + **config, + ) + ] + function_list else: - - function_list = [lambda: self.moveit_joint_task(**config,joint_positions=joint_positions_)] + function_list + function_list = [ + lambda: self.moveit_joint_task(**config, joint_positions=joint_positions_) + ] + function_list for i in range(len(function_list)): if i == 0: @@ -343,93 +313,59 @@ class MoveitInterface(BaseROS2DeviceNode): self.cartesian_flag = True re = function_list[i]() - if not re: - - print(i,re) - goal_handle.abort() - result.success=False - + print(i, re) + result.success = False return result - - goal_handle.succeed() - result.success=True - + result.success = True + except Exception as e: print(e) - goal_handle.abort() self.cartesian_flag = False - result.success=False + result.success = False return result - - def set_status_callback(self,goal_handle: ServerGoalHandle): + def set_status(self, command: str): """ - Goto home position + Goto home position - Args: - command: A JSON-formatted string that includes speed - *status (string) : The joint status moveit will plan - *move_group (string): The move group moveit will plan - separate (list) : The joint index to be separated - lift_height (float) : The height at which the material should be lifted(meters) - x_distance (float) : The distance to the target in x direction(meters) - y_distance (float) : The distance to the target in y direction(meters) - speed (float) : The speed of the movement, speed > 0 - retry (float) : Retry times when moveit plan fails + Args: + command: A JSON-formatted string that includes speed + *status (string) : The joint status moveit will plan + *move_group (string): The move group moveit will plan + separate (list) : The joint index to be separated + lift_height (float) : The height at which the material should be lifted(meters) + x_distance (float) : The distance to the target in x direction(meters) + y_distance (float) : The distance to the target in y direction(meters) + speed (float) : The speed of the movement, speed > 0 + retry (float) : Retry times when moveit plan fails - Returns: - None + Returns: + None """ result = SendCmd.Result() try: - cmd_str = str(goal_handle.request.command).replace('\'','\"') + cmd_str = command.replace("'", '"') cmd_dict = json.loads(cmd_str) - config = {} config["move_group"] = cmd_dict["move_group"] if "speed" in cmd_dict.keys(): config["speed"] = cmd_dict["speed"] if "retry" in cmd_dict.keys(): config["retry"] = cmd_dict["retry"] - - - - if "separate" in cmd_dict.keys(): - separate = cmd_dict["separate"] - else: - separate = None status = cmd_dict["status"] joint_positions_ = self.joint_poses[cmd_dict["move_group"]][status] - - if separate is not None : - - separate_j = [joint_positions_[i] for i in separate] - separate_n = [self.joint_names[i] for i in separate] - - re = self.moveit_joint_task(**config, joint_positions=separate_j,joint_names=separate_n) - if not re: - goal_handle.abort() - result.success=False - return result - - re = self.moveit_joint_task(**config,joint_positions=joint_positions_) - + re = self.moveit_joint_task(**config, joint_positions=joint_positions_) if not re: - goal_handle.abort() - result.success=False + result.success = False return result - - goal_handle.succeed() - result.success=True - + result.success = True except Exception as e: print(e) - goal_handle.abort() - result.success=False + result.success = False - return result \ No newline at end of file + return result diff --git a/unilabos/registry/devices/moveit_config.yaml b/unilabos/registry/devices/moveit_config.yaml index c523e47..383affc 100644 --- a/unilabos/registry/devices/moveit_config.yaml +++ b/unilabos/registry/devices/moveit_config.yaml @@ -2,29 +2,55 @@ moveit.toyo_xyz: description: Toyo XYZ class: module: unilabos.devices.ros_dev.moveit_interface:MoveitInterface - type: ros2 + type: python + action_value_mappings: + set_position: + type: SendCmd + goal: + command: command + feedback: { } + result: { } + pick_and_place: + type: SendCmd + goal: + command: command + feedback: { } + result: { } + set_status: + type: SendCmd + goal: + command: command + feedback: { } + result: { } + model: type: device mesh: toyo_xyz moveit.benyao_arm: description: Benyao Arm - class: - module: unilabos.devices.ros_dev.moveit_interface:MoveitInterface - type: ros2 model: type: device mesh: benyao_arm - action_value_mappings: - set_position: - type: SendCmd - feedback: {} - result: {} - pick_and_place: - type: SendCmd - feedback: {} - result: {} - set_status: - type: SendCmd - feedback: {} - result: {} + class: + module: unilabos.devices.ros_dev.moveit_interface:MoveitInterface + type: python + action_value_mappings: + set_position: + type: SendCmd + goal: + command: command + feedback: {} + result: {} + pick_and_place: + type: SendCmd + goal: + command: command + feedback: {} + result: {} + set_status: + type: SendCmd + goal: + command: command + feedback: {} + result: {} diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index ddc5fca..dcf0759 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -924,6 +924,12 @@ class ROS2DeviceNode: ) self._ros_node: BaseROS2DeviceNode self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}") + self.driver_instance._ros_node = self._ros_node # type: ignore + if hasattr(self.driver_instance, "post_init"): + try: + self.driver_instance.post_init(self._ros_node) # type: ignore + except Exception as e: + self._ros_node.lab_logger().error(f"设备后初始化失败: {e}") def _start_loop(self): def run_event_loop():