From 4888f02c09866f8d857c367bae9e54ad60197ce7 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 16 Sep 2025 09:47:06 +0800 Subject: [PATCH] add server timeout --- unilabos/app/ws_client.py | 198 +++++++++++++++++++++++++++++++------- 1 file changed, 163 insertions(+), 35 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 0e5e8ec9..3659bc06 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -30,6 +30,13 @@ from unilabos.config.config import WSConfig, HTTPConfig, BasicConfig from unilabos.utils import logger +def format_job_log(job_id: str, task_id: str = "", device_id: str = "", action_name: str = "") -> str: + """格式化job日志信息:jobid[:4]-taskid[:4] device_id/action_name""" + job_part = f"{job_id[:4]}-{task_id[:4]}" if task_id else job_id[:4] + device_part = f"{device_id}/{action_name}" if device_id and action_name else "" + return f"{job_part} {device_part}".strip() + + class JobStatus(Enum): """任务状态枚举""" @@ -65,11 +72,20 @@ class JobInfo: status: JobStatus start_time: float last_update_time: float = field(default_factory=time.time) + ready_timeout: Optional[float] = None # READY状态的超时时间 def update_timestamp(self): """更新最后更新时间""" self.last_update_time = time.time() + def set_ready_timeout(self, timeout_seconds: int = 10): + """设置READY状态超时时间""" + self.ready_timeout = time.time() + timeout_seconds + + def is_ready_timeout(self) -> bool: + """检查READY状态是否超时""" + return self.status == JobStatus.READY and self.ready_timeout is not None and time.time() > self.ready_timeout + @dataclass class WebSocketMessage: @@ -107,7 +123,8 @@ class DeviceActionManager: self.device_queues[device_key] = [] job_info.status = JobStatus.QUEUE self.device_queues[device_key].append(job_info) - logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} queued for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.info(f"[DeviceActionManager] Job {job_log} queued for {device_key}") return False # 检查是否有排队的任务 @@ -115,15 +132,18 @@ class DeviceActionManager: # 有排队的任务,加入队列末尾 job_info.status = JobStatus.QUEUE self.device_queues[device_key].append(job_info) - logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} queued for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.info(f"[DeviceActionManager] Job {job_log} queued for {device_key}") return False # 没有正在执行或排队的任务,可以立即执行 # 将其状态设为READY并占位,防止后续job也被判断为free job_info.status = JobStatus.READY job_info.update_timestamp() + job_info.set_ready_timeout(10) # 设置10秒超时 self.active_jobs[device_key] = job_info - logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} can start immediately for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.info(f"[DeviceActionManager] Job {job_log} can start immediately for {device_key}") return True def start_job(self, job_id: str) -> bool: @@ -141,21 +161,23 @@ class DeviceActionManager: # 检查job的状态是否正确 if job_info.status != JobStatus.READY: - logger.error( - f"[DeviceActionManager] Job {job_id[:4]} is not in READY status, current: {job_info.status}" - ) + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}") return False # 检查设备上是否是这个job if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: - logger.error(f"[DeviceActionManager] Job {job_id[:4]} is not the active job for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}") return False # 开始执行任务,将状态从READY转换为STARTED job_info.status = JobStatus.STARTED job_info.update_timestamp() + job_info.ready_timeout = None # 清除超时时间 - logger.info(f"[DeviceActionManager] Job {job_id[:4]} started for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.info(f"[DeviceActionManager] Job {job_log} started for {device_key}") return True def end_job(self, job_id: str) -> Optional[JobInfo]: @@ -177,9 +199,11 @@ class DeviceActionManager: job_info.update_timestamp() # 从all_jobs中移除已结束的job del self.all_jobs[job_id] - logger.info(f"[DeviceActionManager] Job {job_id[:4]} ended for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.info(f"[DeviceActionManager] Job {job_log} ended for {device_key}") else: - logger.warning(f"[DeviceActionManager] Job {job_id[:4]} was not active for {device_key}") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.warning(f"[DeviceActionManager] Job {job_log} was not active for {device_key}") # 检查队列中是否有等待的任务 if device_key in self.device_queues and self.device_queues[device_key]: @@ -187,8 +211,12 @@ class DeviceActionManager: # 将下一个job设置为READY状态并放入active_jobs next_job.status = JobStatus.READY next_job.update_timestamp() + next_job.set_ready_timeout(10) # 设置10秒超时 self.active_jobs[device_key] = next_job - logger.info(f"[DeviceActionManager] Next job {next_job.job_id[:4]} can start for {device_key}") + next_job_log = format_job_log( + next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name + ) + logger.info(f"[DeviceActionManager] Next job {next_job_log} can start for {device_key}") return next_job return None @@ -229,7 +257,8 @@ class DeviceActionManager: # job_info.status = JobStatus.ENDED # # 从all_jobs中移除 # del self.all_jobs[job_id] - # logger.info(f"[DeviceActionManager] Active job {job_id[:4]} cancelled for {device_key}") + # job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + # logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}") # # 启动下一个任务 # if device_key in self.device_queues and self.device_queues[device_key]: @@ -237,8 +266,11 @@ class DeviceActionManager: # # 将下一个job设置为READY状态并放入active_jobs # next_job.status = JobStatus.READY # next_job.update_timestamp() + # next_job.set_ready_timeout(10) # self.active_jobs[device_key] = next_job - # logger.info(f"[DeviceActionManager] Next job {next_job.job_id[:4]} can start after cancel") + # next_job_log = format_job_log(next_job.job_id, next_job.task_id, + # next_job.device_id, next_job.action_name) + # logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel") # return True pass @@ -250,10 +282,14 @@ class DeviceActionManager: job_info.status = JobStatus.ENDED # 从all_jobs中移除 del self.all_jobs[job_id] - logger.info(f"[DeviceActionManager] Queued job {job_id[:4]} cancelled for {device_key}") + job_log = format_job_log( + job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name + ) + logger.info(f"[DeviceActionManager] Queued job {job_log} cancelled for {device_key}") return True - logger.warning(f"[DeviceActionManager] Job {job_id[:4]} not found in active or queued jobs") + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.warning(f"[DeviceActionManager] Job {job_log} not found in active or queued jobs") return False def cancel_jobs_by_task_id(self, task_id: str) -> List[str]: @@ -282,6 +318,27 @@ class DeviceActionManager: return cancelled_job_ids + def check_ready_timeouts(self) -> List[JobInfo]: + """检查READY状态超时的任务,仅检测不处理""" + timeout_jobs = [] + + with self.lock: + # 统计READY状态的任务数量 + ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY) + if ready_jobs_count > 0: + logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501 + + # 找到所有超时的READY任务(只检测,不处理) + for job_info in self.active_jobs.values(): + if job_info.is_ready_timeout(): + timeout_jobs.append(job_info) + job_log = format_job_log( + job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name + ) + logger.warning(f"[DeviceActionManager] Job {job_log} READY timeout detected") + + return timeout_jobs + class MessageProcessor: """消息处理线程 - 处理WebSocket消息,划分任务执行和任务队列""" @@ -541,19 +598,19 @@ class MessageProcessor: # 添加到设备管理器 can_start_immediately = self.device_manager.add_queue_request(job_info) - # 发送响应 + job_log = format_job_log(job_id, task_id, device_id, action_name) if can_start_immediately: # 可以立即开始 await self._send_action_state_response( device_id, action_name, task_id, job_id, "query_action_status", True, 0 ) - logger.info(f"[MessageProcessor] Job {job_id[:4]} can start immediately") + logger.info(f"[MessageProcessor] Job {job_log} can start immediately") else: # 需要排队 await self._send_action_state_response( device_id, action_name, task_id, job_id, "query_action_status", False, 10 ) - logger.info(f"[MessageProcessor] Job {job_id[:4]} queued") + logger.info(f"[MessageProcessor] Job {job_log} queued") # 通知QueueProcessor有新的队列更新 if self.queue_processor: @@ -564,13 +621,13 @@ class MessageProcessor: try: req = JobAddReq(**data) - # 开始执行任务 + job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action) success = self.device_manager.start_job(req.job_id) if not success: - logger.error(f"[MessageProcessor] Failed to start job {req.job_id[:4]}") + logger.error(f"[MessageProcessor] Failed to start job {job_log}") return - logger.info(f"[MessageProcessor] Starting job {req.job_id[:4]} for {req.device_id}/{req.action}") + logger.info(f"[MessageProcessor] Starting job {job_log}") # 创建HostNode任务 device_action_key = f"/devices/{req.device_id}/{req.action}" @@ -602,7 +659,8 @@ class MessageProcessor: # job_start出错时,需要通过正确的publish_job_status方法来处理 if "req" in locals() and "queue_item" in locals(): - logger.info(f"[MessageProcessor] Publishing failed status for job {req.job_id[:4]}") + job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action) + logger.info(f"[MessageProcessor] Publishing failed status for job {job_log}") if self.websocket_client: # 使用完整的错误信息,与原版本一致 @@ -639,7 +697,10 @@ class MessageProcessor: True, 0, ) - logger.info(f"[MessageProcessor] Started next job {next_job.job_id[:4]} after error") + next_job_log = format_job_log( + next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name + ) + logger.info(f"[MessageProcessor] Started next job {next_job_log} after error") # 通知QueueProcessor有队列更新 if self.queue_processor: @@ -655,6 +716,15 @@ class MessageProcessor: logger.info(f"[MessageProcessor] Cancel request - task_id: {task_id}, job_id: {job_id}") if job_id: + # 获取job信息用于日志 + job_info = self.device_manager.get_job_info(job_id) + job_log = format_job_log( + job_id, + job_info.task_id if job_info else "", + job_info.device_id if job_info else "", + job_info.action_name if job_info else "", + ) + # 按job_id取消单个job success = self.device_manager.cancel_job(job_id) if success: @@ -662,13 +732,13 @@ class MessageProcessor: host_node = HostNode.get_instance(0) if host_node: host_node.cancel_goal(job_id) - logger.info(f"[MessageProcessor] Job {job_id[:4]} cancelled") + logger.info(f"[MessageProcessor] Job {job_log} cancelled") # 通知QueueProcessor有队列更新 if self.queue_processor: self.queue_processor.notify_queue_update() else: - logger.warning(f"[MessageProcessor] Failed to cancel job {job_id[:4]}") + logger.warning(f"[MessageProcessor] Failed to cancel job {job_log}") elif task_id: # 按task_id取消所有相关job @@ -732,6 +802,7 @@ class QueueProcessor: def __init__(self, device_manager: DeviceActionManager, message_processor: MessageProcessor): self.device_manager = device_manager self.message_processor = message_processor + self.websocket_client = None # 延迟设置 # 线程控制 self.is_running = False @@ -742,6 +813,10 @@ class QueueProcessor: logger.info("[QueueProcessor] Initialized") + def set_websocket_client(self, websocket_client: "WebSocketClient"): + """设置WebSocket客户端引用""" + self.websocket_client = websocket_client + def start(self) -> None: """启动队列处理线程""" if self.is_running: @@ -766,6 +841,36 @@ class QueueProcessor: while self.is_running: try: + # 检查READY状态超时的任务 + timeout_jobs = self.device_manager.check_ready_timeouts() + if timeout_jobs: + logger.info(f"[QueueProcessor] Found {len(timeout_jobs)} READY jobs that timed out") + # 为超时的job发布失败状态,通过正常job完成流程处理 + for timeout_job in timeout_jobs: + timeout_item = QueueItem( + task_type="job_call_back_status", + device_id=timeout_job.device_id, + action_name=timeout_job.action_name, + task_id=timeout_job.task_id, + job_id=timeout_job.job_id, + device_action_key=timeout_job.device_action_key, + ) + # 发布超时失败状态,这会触发正常的job完成流程 + if self.websocket_client: + job_log = format_job_log( + timeout_job.job_id, timeout_job.task_id, timeout_job.device_id, timeout_job.action_name + ) + logger.info(f"[QueueProcessor] Publishing timeout failure for job {job_log}") + self.websocket_client.publish_job_status( + {}, + timeout_item, + "failed", + serialize_result_info("Job READY state timeout after 10 seconds", False, {}), + ) + + # 立即触发状态更新 + self.notify_queue_update() + # 发送正在执行任务的running状态 self._send_running_status() @@ -811,7 +916,8 @@ class QueueProcessor: }, } self.message_processor.send_message(message) - logger.trace(f"[QueueProcessor] Sent running status for job {job_info.job_id[:4]}") # type: ignore + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) + logger.trace(f"[QueueProcessor] Sent running status for job {job_log}") # type: ignore def _send_busy_status(self): """发送排队任务的busy状态""" @@ -838,14 +944,24 @@ class QueueProcessor: }, } success = self.message_processor.send_message(message) + job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name) if success: - logger.debug(f"[QueueProcessor] Sent busy/need_more for queued job {job_info.job_id[:4]}") + logger.debug(f"[QueueProcessor] Sent busy/need_more for queued job {job_log}") else: - logger.warning(f"[QueueProcessor] Failed to send busy status for job {job_info.job_id[:4]}") + logger.warning(f"[QueueProcessor] Failed to send busy status for job {job_log}") def handle_job_completed(self, job_id: str, status: str) -> None: """处理任务完成""" - logger.info(f"[QueueProcessor] Job {job_id[:4]} completed with status: {status}") + # 获取job信息用于日志 + job_info = self.device_manager.get_job_info(job_id) + job_log = format_job_log( + job_id, + job_info.task_id if job_info else "", + job_info.device_id if job_info else "", + job_info.action_name if job_info else "", + ) + + logger.info(f"[QueueProcessor] Job {job_log} completed with status: {status}") # 结束任务,获取下一个可执行的任务 next_job = self.device_manager.end_job(job_id) @@ -865,7 +981,8 @@ class QueueProcessor: }, } self.message_processor.send_message(message) - logger.info(f"[QueueProcessor] Notified next job {next_job.job_id[:4]} can start") + next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name) + logger.info(f"[QueueProcessor] Notified next job {next_job_log} can start") # 立即触发下一轮状态检查 self.notify_queue_update() @@ -899,6 +1016,7 @@ class WebSocketClient(BaseCommunicationClient): # 设置相互引用 self.message_processor.set_queue_processor(self.queue_processor) self.message_processor.set_websocket_client(self) + self.queue_processor.set_websocket_client(self) logger.info(f"[WebSocketClient] Client_id: {self.client_id}") @@ -997,7 +1115,7 @@ class WebSocketClient(BaseCommunicationClient): logger.info(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}") - # 通知队列处理器job完成 + # 通知队列处理器job完成(包括timeout的job) self.queue_processor.handle_job_completed(item.job_id, status) # 发送job状态消息 @@ -1016,7 +1134,8 @@ class WebSocketClient(BaseCommunicationClient): } self.message_processor.send_message(message) - logger.debug(f"[WebSocketClient] Job status published: {item.job_id[:4]} - {status}") + job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name) + logger.debug(f"[WebSocketClient] Job status published: {job_log} - {status}") def send_ping(self, ping_id: str, timestamp: float) -> None: """发送ping消息""" @@ -1030,9 +1149,18 @@ class WebSocketClient(BaseCommunicationClient): def cancel_goal(self, job_id: str) -> None: """取消指定的任务""" - logger.debug(f"[WebSocketClient] Cancel goal request for job_id: {job_id[:4]}") + # 获取job信息用于日志 + job_info = self.device_manager.get_job_info(job_id) + job_log = format_job_log( + job_id, + job_info.task_id if job_info else "", + job_info.device_id if job_info else "", + job_info.action_name if job_info else "", + ) + + logger.debug(f"[WebSocketClient] Cancel goal request for job: {job_log}") success = self.device_manager.cancel_job(job_id) if success: - logger.info(f"[WebSocketClient] Job {job_id[:4]} cancelled successfully") + logger.info(f"[WebSocketClient] Job {job_log} cancelled successfully") else: - logger.warning(f"[WebSocketClient] Failed to cancel job {job_id[:4]}") + logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}")