diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index c91ca8ec..f0c29f37 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -261,29 +261,28 @@ class DeviceActionManager: device_key = job_info.device_action_key # 如果是正在执行的任务 - if ( - device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id - ): # 后面需要和cancel_goal进行联动,而不是在这里进行处理,现在默认等待这个job结束 - # del self.active_jobs[device_key] - # job_info.status = JobStatus.ENDED - # # 从all_jobs中移除 - # del self.all_jobs[job_id] - # job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) - # logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}") + if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: + # 清理active job状态 + del self.active_jobs[device_key] + job_info.status = JobStatus.ENDED + # 从all_jobs中移除 + del self.all_jobs[job_id] + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}") - # # 启动下一个任务 - # if device_key in self.device_queues and self.device_queues[device_key]: - # next_job = self.device_queues[device_key].pop(0) - # # 将下一个job设置为READY状态并放入active_jobs - # next_job.status = JobStatus.READY - # next_job.update_timestamp() - # next_job.set_ready_timeout(10) - # self.active_jobs[device_key] = next_job - # next_job_log = format_job_log(next_job.job_id, next_job.task_id, - # next_job.device_id, next_job.action_name) - # logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel") - # return True - pass + # 启动下一个任务 + if device_key in self.device_queues and self.device_queues[device_key]: + next_job = self.device_queues[device_key].pop(0) + # 将下一个job设置为READY状态并放入active_jobs + next_job.status = JobStatus.READY + next_job.update_timestamp() + next_job.set_ready_timeout(10) + self.active_jobs[device_key] = next_job + next_job_log = format_job_log( + next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name + ) + logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel") + return True # 如果是排队中的任务 elif device_key in self.device_queues: @@ -741,31 +740,51 @@ class MessageProcessor: job_info.action_name if job_info else "", ) - # 按job_id取消单个job + # 先通知HostNode取消ROS2 action(如果存在) + host_node = HostNode.get_instance(0) + ros_cancel_success = False + if host_node: + ros_cancel_success = host_node.cancel_goal(job_id) + if ros_cancel_success: + logger.info(f"[MessageProcessor] ROS2 cancel request sent for job {job_log}") + else: + logger.debug( + f"[MessageProcessor] Job {job_log} not in ROS2 goals " "(may be queued or already finished)" + ) + + # 按job_id取消单个job(清理状态机) success = self.device_manager.cancel_job(job_id) if success: - # 通知HostNode取消 - host_node = HostNode.get_instance(0) - if host_node: - host_node.cancel_goal(job_id) - logger.info(f"[MessageProcessor] Job {job_log} cancelled") + logger.info(f"[MessageProcessor] Job {job_log} cancelled from queue/active list") # 通知QueueProcessor有队列更新 if self.queue_processor: self.queue_processor.notify_queue_update() else: - logger.warning(f"[MessageProcessor] Failed to cancel job {job_log}") + logger.warning(f"[MessageProcessor] Failed to cancel job {job_log} from queue") elif task_id: - # 按task_id取消所有相关job + # 先通知HostNode取消所有ROS2 actions + # 需要先获取所有相关job_ids + jobs_to_cancel = [] + with self.device_manager.lock: + jobs_to_cancel = [ + job_info for job_info in self.device_manager.all_jobs.values() if job_info.task_id == task_id + ] + + host_node = HostNode.get_instance(0) + if host_node and jobs_to_cancel: + ros_cancelled_count = 0 + for job_info in jobs_to_cancel: + if host_node.cancel_goal(job_info.job_id): + ros_cancelled_count += 1 + logger.info( + f"[MessageProcessor] Sent ROS2 cancel for " f"{ros_cancelled_count}/{len(jobs_to_cancel)} jobs" + ) + + # 按task_id取消所有相关job(清理状态机) cancelled_job_ids = self.device_manager.cancel_jobs_by_task_id(task_id) if cancelled_job_ids: - # 通知HostNode取消所有job - host_node = HostNode.get_instance(0) - if host_node: - for cancelled_job_id in cancelled_job_ids: - host_node.cancel_goal(cancelled_job_id) - logger.info(f"[MessageProcessor] Cancelled {len(cancelled_job_ids)} jobs for task_id: {task_id}") # 通知QueueProcessor有队列更新 @@ -1056,11 +1075,19 @@ class QueueProcessor: """处理任务完成""" # 获取job信息用于日志 job_info = self.device_manager.get_job_info(job_id) + + # 如果job不存在,说明可能已被手动取消 + if not job_info: + logger.debug( + f"[QueueProcessor] Job {job_id[:8]} not found in manager " "(may have been cancelled manually)" + ) + return + job_log = format_job_log( job_id, - job_info.task_id if job_info else "", - job_info.device_id if job_info else "", - job_info.action_name if job_info else "", + job_info.task_id, + job_info.device_id, + job_info.action_name, ) logger.info(f"[QueueProcessor] Job {job_log} completed with status: {status}") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index 7a8806d4..574a3899 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -734,46 +734,116 @@ class HostNode(BaseROS2DeviceNode): def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None: """获取结果回调""" job_id = item.job_id - result_msg = future.result().result - result_data = convert_from_ros_msg(result_msg) - status = "success" - return_info_str = result_data.get("return_info") - if return_info_str is not None: - try: - return_info = json.loads(return_info_str) - suc = return_info.get("suc", False) - if not suc: - status = "failed" - except json.JSONDecodeError: + + try: + result = future.result() + result_msg = result.result + goal_status = result.status + + # 检查是否是被取消的任务 + if goal_status == GoalStatus.STATUS_CANCELED: + self.lab_logger().info(f"[Host Node] Goal {action_id} ({job_id[:8]}) was cancelled") status = "failed" - return_info = serialize_result_info("", False, result_data) - self.lab_logger().critical("错误的return_info类型,请断点修复") - else: - # 无 return_info 字段时,回退到 success 字段(若存在) - suc_field = result_data.get("success") - if isinstance(suc_field, bool): - status = "success" if suc_field else "failed" - return_info = serialize_result_info("", suc_field, result_data) + return_info = serialize_result_info("Job was cancelled", False, {}) else: - # 最保守的回退:标记失败并返回空JSON - status = "failed" - return_info = serialize_result_info("缺少return_info", False, result_data) + result_data = convert_from_ros_msg(result_msg) + status = "success" + return_info_str = result_data.get("return_info") + if return_info_str is not None: + try: + return_info = json.loads(return_info_str) + suc = return_info.get("suc", False) + if not suc: + status = "failed" + except json.JSONDecodeError: + status = "failed" + return_info = serialize_result_info("", False, result_data) + self.lab_logger().critical("错误的return_info类型,请断点修复") + else: + # 无 return_info 字段时,回退到 success 字段(若存在) + suc_field = result_data.get("success") + if isinstance(suc_field, bool): + status = "success" if suc_field else "failed" + return_info = serialize_result_info("", suc_field, result_data) + else: + # 最保守的回退:标记失败并返回空JSON + status = "failed" + return_info = serialize_result_info("缺少return_info", False, result_data) - self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id}): {status}") - self.lab_logger().debug(f"[Host Node] Result data: {result_data}") + self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}") + if goal_status != GoalStatus.STATUS_CANCELED: + self.lab_logger().debug(f"[Host Node] Result data: {result_data}") - if job_id: + # 清理 _goals 中的记录 + if job_id in self._goals: + del self._goals[job_id] + self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals") + + # 发布状态到桥接器 + if job_id: + for bridge in self.bridges: + if hasattr(bridge, "publish_job_status"): + if goal_status == GoalStatus.STATUS_CANCELED: + bridge.publish_job_status({}, item, status, return_info) + else: + bridge.publish_job_status(result_data, item, status, return_info) + + except Exception as e: + self.lab_logger().error( + f"[Host Node] Error in get_result_callback for {action_id} ({job_id[:8]}): {str(e)}" + ) + import traceback + + self.lab_logger().error(traceback.format_exc()) + + # 清理 _goals 中的记录 + if job_id in self._goals: + del self._goals[job_id] + + # 发布失败状态 for bridge in self.bridges: if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status(result_data, item, status, return_info) + bridge.publish_job_status( + {}, item, "failed", serialize_result_info(f"Callback error: {str(e)}", False, {}) + ) - def cancel_goal(self, goal_uuid: str) -> None: - """取消目标""" + def cancel_goal(self, goal_uuid: str) -> bool: + """ + 取消目标 + + Args: + goal_uuid: 目标UUID(job_id) + + Returns: + bool: 如果找到目标并发起取消请求返回True,否则返回False + """ if goal_uuid in self._goals: - self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid}") - self._goals[goal_uuid].cancel_goal_async() + self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid[:8]}") + goal_handle = self._goals[goal_uuid] + + # 发起异步取消请求 + cancel_future = goal_handle.cancel_goal_async() + + # 添加取消完成的回调 + cancel_future.add_done_callback(lambda future: self._cancel_goal_callback(goal_uuid, future)) + return True else: - self.lab_logger().warning(f"[Host Node] Goal {goal_uuid} not found, cannot cancel") + self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} not found in _goals, cannot cancel") + return False + + def _cancel_goal_callback(self, goal_uuid: str, future) -> None: + """取消目标的回调""" + try: + cancel_response = future.result() + if cancel_response.goals_canceling: + self.lab_logger().info(f"[Host Node] Goal {goal_uuid[:8]} cancel request accepted") + else: + self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} cancel request rejected") + except Exception as e: + self.lab_logger().error(f"[Host Node] Error cancelling goal {goal_uuid[:8]}: {str(e)}") + import traceback + + self.lab_logger().error(traceback.format_exc()) def get_goal_status(self, job_id: str) -> int: """获取目标状态""" diff --git a/unilabos/ros/nodes/resource_tracker.py b/unilabos/ros/nodes/resource_tracker.py index 856c6ef5..adfc1467 100644 --- a/unilabos/ros/nodes/resource_tracker.py +++ b/unilabos/ros/nodes/resource_tracker.py @@ -848,15 +848,13 @@ class DeviceNodeResourceTracker(object): extra: extra字典值 """ if isinstance(resource, dict): - # ⭐ 修复:合并extra而不是覆盖 - current_extra = resource.get("extra", {}) - current_extra.update(extra) - resource["extra"] = current_extra + c_extra = resource.get("extra", {}) + c_extra.update(extra) + resource["extra"] = c_extra else: - # ⭐ 修复:合并unilabos_extra而不是覆盖 - current_extra = getattr(resource, "unilabos_extra", {}) - current_extra.update(extra) - setattr(resource, "unilabos_extra", current_extra) + c_extra = getattr(resource, "unilabos_extra", {}) + c_extra.update(extra) + setattr(resource, "unilabos_extra", c_extra) def _traverse_and_process(self, resource, process_func) -> int: """