mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 21:11:12 +00:00
fix cancel error
This commit is contained in:
@@ -261,29 +261,28 @@ class DeviceActionManager:
|
||||
device_key = job_info.device_action_key
|
||||
|
||||
# 如果是正在执行的任务
|
||||
if (
|
||||
device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id
|
||||
): # 后面需要和cancel_goal进行联动,而不是在这里进行处理,现在默认等待这个job结束
|
||||
# del self.active_jobs[device_key]
|
||||
# job_info.status = JobStatus.ENDED
|
||||
# # 从all_jobs中移除
|
||||
# del self.all_jobs[job_id]
|
||||
# job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
|
||||
# logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}")
|
||||
if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id:
|
||||
# 清理active job状态
|
||||
del self.active_jobs[device_key]
|
||||
job_info.status = JobStatus.ENDED
|
||||
# 从all_jobs中移除
|
||||
del self.all_jobs[job_id]
|
||||
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
|
||||
logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}")
|
||||
|
||||
# # 启动下一个任务
|
||||
# if device_key in self.device_queues and self.device_queues[device_key]:
|
||||
# next_job = self.device_queues[device_key].pop(0)
|
||||
# # 将下一个job设置为READY状态并放入active_jobs
|
||||
# next_job.status = JobStatus.READY
|
||||
# next_job.update_timestamp()
|
||||
# next_job.set_ready_timeout(10)
|
||||
# self.active_jobs[device_key] = next_job
|
||||
# next_job_log = format_job_log(next_job.job_id, next_job.task_id,
|
||||
# next_job.device_id, next_job.action_name)
|
||||
# logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel")
|
||||
# return True
|
||||
pass
|
||||
# 启动下一个任务
|
||||
if device_key in self.device_queues and self.device_queues[device_key]:
|
||||
next_job = self.device_queues[device_key].pop(0)
|
||||
# 将下一个job设置为READY状态并放入active_jobs
|
||||
next_job.status = JobStatus.READY
|
||||
next_job.update_timestamp()
|
||||
next_job.set_ready_timeout(10)
|
||||
self.active_jobs[device_key] = next_job
|
||||
next_job_log = format_job_log(
|
||||
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
|
||||
)
|
||||
logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel")
|
||||
return True
|
||||
|
||||
# 如果是排队中的任务
|
||||
elif device_key in self.device_queues:
|
||||
@@ -741,31 +740,51 @@ class MessageProcessor:
|
||||
job_info.action_name if job_info else "",
|
||||
)
|
||||
|
||||
# 按job_id取消单个job
|
||||
# 先通知HostNode取消ROS2 action(如果存在)
|
||||
host_node = HostNode.get_instance(0)
|
||||
ros_cancel_success = False
|
||||
if host_node:
|
||||
ros_cancel_success = host_node.cancel_goal(job_id)
|
||||
if ros_cancel_success:
|
||||
logger.info(f"[MessageProcessor] ROS2 cancel request sent for job {job_log}")
|
||||
else:
|
||||
logger.debug(
|
||||
f"[MessageProcessor] Job {job_log} not in ROS2 goals " "(may be queued or already finished)"
|
||||
)
|
||||
|
||||
# 按job_id取消单个job(清理状态机)
|
||||
success = self.device_manager.cancel_job(job_id)
|
||||
if success:
|
||||
# 通知HostNode取消
|
||||
host_node = HostNode.get_instance(0)
|
||||
if host_node:
|
||||
host_node.cancel_goal(job_id)
|
||||
logger.info(f"[MessageProcessor] Job {job_log} cancelled")
|
||||
logger.info(f"[MessageProcessor] Job {job_log} cancelled from queue/active list")
|
||||
|
||||
# 通知QueueProcessor有队列更新
|
||||
if self.queue_processor:
|
||||
self.queue_processor.notify_queue_update()
|
||||
else:
|
||||
logger.warning(f"[MessageProcessor] Failed to cancel job {job_log}")
|
||||
logger.warning(f"[MessageProcessor] Failed to cancel job {job_log} from queue")
|
||||
|
||||
elif task_id:
|
||||
# 按task_id取消所有相关job
|
||||
# 先通知HostNode取消所有ROS2 actions
|
||||
# 需要先获取所有相关job_ids
|
||||
jobs_to_cancel = []
|
||||
with self.device_manager.lock:
|
||||
jobs_to_cancel = [
|
||||
job_info for job_info in self.device_manager.all_jobs.values() if job_info.task_id == task_id
|
||||
]
|
||||
|
||||
host_node = HostNode.get_instance(0)
|
||||
if host_node and jobs_to_cancel:
|
||||
ros_cancelled_count = 0
|
||||
for job_info in jobs_to_cancel:
|
||||
if host_node.cancel_goal(job_info.job_id):
|
||||
ros_cancelled_count += 1
|
||||
logger.info(
|
||||
f"[MessageProcessor] Sent ROS2 cancel for " f"{ros_cancelled_count}/{len(jobs_to_cancel)} jobs"
|
||||
)
|
||||
|
||||
# 按task_id取消所有相关job(清理状态机)
|
||||
cancelled_job_ids = self.device_manager.cancel_jobs_by_task_id(task_id)
|
||||
if cancelled_job_ids:
|
||||
# 通知HostNode取消所有job
|
||||
host_node = HostNode.get_instance(0)
|
||||
if host_node:
|
||||
for cancelled_job_id in cancelled_job_ids:
|
||||
host_node.cancel_goal(cancelled_job_id)
|
||||
|
||||
logger.info(f"[MessageProcessor] Cancelled {len(cancelled_job_ids)} jobs for task_id: {task_id}")
|
||||
|
||||
# 通知QueueProcessor有队列更新
|
||||
@@ -1056,11 +1075,19 @@ class QueueProcessor:
|
||||
"""处理任务完成"""
|
||||
# 获取job信息用于日志
|
||||
job_info = self.device_manager.get_job_info(job_id)
|
||||
|
||||
# 如果job不存在,说明可能已被手动取消
|
||||
if not job_info:
|
||||
logger.debug(
|
||||
f"[QueueProcessor] Job {job_id[:8]} not found in manager " "(may have been cancelled manually)"
|
||||
)
|
||||
return
|
||||
|
||||
job_log = format_job_log(
|
||||
job_id,
|
||||
job_info.task_id if job_info else "",
|
||||
job_info.device_id if job_info else "",
|
||||
job_info.action_name if job_info else "",
|
||||
job_info.task_id,
|
||||
job_info.device_id,
|
||||
job_info.action_name,
|
||||
)
|
||||
|
||||
logger.info(f"[QueueProcessor] Job {job_log} completed with status: {status}")
|
||||
|
||||
@@ -734,7 +734,18 @@ class HostNode(BaseROS2DeviceNode):
|
||||
def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None:
|
||||
"""获取结果回调"""
|
||||
job_id = item.job_id
|
||||
result_msg = future.result().result
|
||||
|
||||
try:
|
||||
result = future.result()
|
||||
result_msg = result.result
|
||||
goal_status = result.status
|
||||
|
||||
# 检查是否是被取消的任务
|
||||
if goal_status == GoalStatus.STATUS_CANCELED:
|
||||
self.lab_logger().info(f"[Host Node] Goal {action_id} ({job_id[:8]}) was cancelled")
|
||||
status = "failed"
|
||||
return_info = serialize_result_info("Job was cancelled", False, {})
|
||||
else:
|
||||
result_data = convert_from_ros_msg(result_msg)
|
||||
status = "success"
|
||||
return_info_str = result_data.get("return_info")
|
||||
@@ -759,21 +770,80 @@ class HostNode(BaseROS2DeviceNode):
|
||||
status = "failed"
|
||||
return_info = serialize_result_info("缺少return_info", False, result_data)
|
||||
|
||||
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id}): {status}")
|
||||
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}")
|
||||
if goal_status != GoalStatus.STATUS_CANCELED:
|
||||
self.lab_logger().debug(f"[Host Node] Result data: {result_data}")
|
||||
|
||||
# 清理 _goals 中的记录
|
||||
if job_id in self._goals:
|
||||
del self._goals[job_id]
|
||||
self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals")
|
||||
|
||||
# 发布状态到桥接器
|
||||
if job_id:
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "publish_job_status"):
|
||||
if goal_status == GoalStatus.STATUS_CANCELED:
|
||||
bridge.publish_job_status({}, item, status, return_info)
|
||||
else:
|
||||
bridge.publish_job_status(result_data, item, status, return_info)
|
||||
|
||||
def cancel_goal(self, goal_uuid: str) -> None:
|
||||
"""取消目标"""
|
||||
except Exception as e:
|
||||
self.lab_logger().error(
|
||||
f"[Host Node] Error in get_result_callback for {action_id} ({job_id[:8]}): {str(e)}"
|
||||
)
|
||||
import traceback
|
||||
|
||||
self.lab_logger().error(traceback.format_exc())
|
||||
|
||||
# 清理 _goals 中的记录
|
||||
if job_id in self._goals:
|
||||
del self._goals[job_id]
|
||||
|
||||
# 发布失败状态
|
||||
for bridge in self.bridges:
|
||||
if hasattr(bridge, "publish_job_status"):
|
||||
bridge.publish_job_status(
|
||||
{}, item, "failed", serialize_result_info(f"Callback error: {str(e)}", False, {})
|
||||
)
|
||||
|
||||
def cancel_goal(self, goal_uuid: str) -> bool:
|
||||
"""
|
||||
取消目标
|
||||
|
||||
Args:
|
||||
goal_uuid: 目标UUID(job_id)
|
||||
|
||||
Returns:
|
||||
bool: 如果找到目标并发起取消请求返回True,否则返回False
|
||||
"""
|
||||
if goal_uuid in self._goals:
|
||||
self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid}")
|
||||
self._goals[goal_uuid].cancel_goal_async()
|
||||
self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid[:8]}")
|
||||
goal_handle = self._goals[goal_uuid]
|
||||
|
||||
# 发起异步取消请求
|
||||
cancel_future = goal_handle.cancel_goal_async()
|
||||
|
||||
# 添加取消完成的回调
|
||||
cancel_future.add_done_callback(lambda future: self._cancel_goal_callback(goal_uuid, future))
|
||||
return True
|
||||
else:
|
||||
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid} not found, cannot cancel")
|
||||
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} not found in _goals, cannot cancel")
|
||||
return False
|
||||
|
||||
def _cancel_goal_callback(self, goal_uuid: str, future) -> None:
|
||||
"""取消目标的回调"""
|
||||
try:
|
||||
cancel_response = future.result()
|
||||
if cancel_response.goals_canceling:
|
||||
self.lab_logger().info(f"[Host Node] Goal {goal_uuid[:8]} cancel request accepted")
|
||||
else:
|
||||
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} cancel request rejected")
|
||||
except Exception as e:
|
||||
self.lab_logger().error(f"[Host Node] Error cancelling goal {goal_uuid[:8]}: {str(e)}")
|
||||
import traceback
|
||||
|
||||
self.lab_logger().error(traceback.format_exc())
|
||||
|
||||
def get_goal_status(self, job_id: str) -> int:
|
||||
"""获取目标状态"""
|
||||
|
||||
@@ -848,15 +848,13 @@ class DeviceNodeResourceTracker(object):
|
||||
extra: extra字典值
|
||||
"""
|
||||
if isinstance(resource, dict):
|
||||
# ⭐ 修复:合并extra而不是覆盖
|
||||
current_extra = resource.get("extra", {})
|
||||
current_extra.update(extra)
|
||||
resource["extra"] = current_extra
|
||||
c_extra = resource.get("extra", {})
|
||||
c_extra.update(extra)
|
||||
resource["extra"] = c_extra
|
||||
else:
|
||||
# ⭐ 修复:合并unilabos_extra而不是覆盖
|
||||
current_extra = getattr(resource, "unilabos_extra", {})
|
||||
current_extra.update(extra)
|
||||
setattr(resource, "unilabos_extra", current_extra)
|
||||
c_extra = getattr(resource, "unilabos_extra", {})
|
||||
c_extra.update(extra)
|
||||
setattr(resource, "unilabos_extra", c_extra)
|
||||
|
||||
def _traverse_and_process(self, resource, process_func) -> int:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user