diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index f48b74b9..69c70b13 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -60,6 +60,10 @@ class TaskScheduler: self.active_jobs = {} # job_id -> 任务信息 self.cancel_events = {} # job_id -> asyncio.Event for cancellation + # 立即执行标记字典 - device_id+action_name -> timestamp + self.immediate_execution_flags = {} # 存储需要立即执行的设备动作组合 + self.immediate_execution_lock = threading.Lock() # 立即执行标记锁 + # 队列处理器 self.queue_processor_thread = None self.queue_running = False @@ -121,11 +125,16 @@ class TaskScheduler: await asyncio.sleep(0.2) # 队列为空时等待 continue + with self.immediate_execution_lock: + expired_keys = [k for k, v in self.immediate_execution_flags.items() if current_time > v] + for k in expired_keys: + del self.immediate_execution_flags[k] + immediate_execution = self.immediate_execution_flags.copy() # 处理每个任务 for item in items_to_process: try: # 检查是否到了执行时间,是我们本地的执行时间,按顺序填入 - if current_time < item.next_run_time: + if current_time < item.next_run_time and item.device_action_key not in immediate_execution: # 还没到执行时间,保留在队列中(保持原有顺序) items_to_requeue.append(item) continue @@ -145,12 +154,14 @@ class TaskScheduler: item.next_run_time = current_time + 10 # 10秒后再次执行 item.retry_count += 1 items_to_requeue.append(item) - logger.critical( - f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} for {item.device_action_key}" + logger.trace( # type: ignore + f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} " + f"for {item.device_action_key}" ) else: - logger.warning( - f"[TaskScheduler] Completed {item.job_id} {item.task_type} for {item.device_action_key}" + logger.debug( + f"[TaskScheduler] Completed {item.job_id} {item.task_type} " + f"for {item.device_action_key}" ) except Exception as e: @@ -212,12 +223,12 @@ class TaskScheduler: try: # 检查任务是否还在活跃列表中 if item.job_id not in self.active_jobs: - logger.critical(f"[TaskScheduler] Job {item.job_id} no longer active") + logger.debug(f"[TaskScheduler] Job {item.job_id} no longer active") return False # 检查是否收到取消信号 if item.job_id in self.cancel_events and self.cancel_events[item.job_id].is_set(): - logger.critical(f"[TaskScheduler] Job {item.job_id} cancelled via cancel event") + logger.info(f"[TaskScheduler] Job {item.job_id} cancelled via cancel event") return False # 检查设备状态 @@ -299,7 +310,7 @@ class TaskScheduler: await self._publish_device_action_state( device_id, action_name, task_id, job_id, "query_action_status", True, 0 ) - logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately") + logger.debug(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately") host_node = HostNode.get_instance(0) if not host_node: logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}") @@ -335,7 +346,10 @@ class TaskScheduler: next_run_time=time.time() + 10, # 10秒后执行 ) self.action_queue.append(queue_item) - logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, added to polling queue {action_jobs}") + logger.debug( + f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, " + f"added to polling queue {action_jobs}" + ) # 立即发送busy状态 await self._publish_device_action_state( @@ -350,7 +364,7 @@ class TaskScheduler: req = JobAddReq(**data) device_action_key = f"/devices/{req.device_id}/{req.action}" - logger.critical( + logger.info( f"[TaskScheduler] Starting job with job_id: {req.job_id}, " f"device: {req.device_id}, action: {req.action}" ) @@ -370,9 +384,7 @@ class TaskScheduler: try: # 启动callback定时发送 - await self._start_job_callback( - req.job_id, req.device_id, req.action, req.task_id, device_action_key - ) + await self._start_job_callback(req.job_id, req.device_id, req.action, req.task_id, device_action_key) # 创建兼容HostNode的QueueItem对象 job_queue_item = QueueItem( @@ -405,7 +417,7 @@ class TaskScheduler: host_node = HostNode.get_instance(0) if host_node: host_node._device_action_status[device_action_key].job_ids.pop(req.job_id, None) - logger.critical(f"[TaskScheduler] Cleaned up failed job from HostNode: {req.job_id}") + logger.warning(f"[TaskScheduler] Cleaned up failed job from HostNode: {req.job_id}") except Exception as e: logger.error(f"[TaskScheduler] Error handling job start: {str(e)}") @@ -414,7 +426,7 @@ class TaskScheduler: task_id = data.get("task_id") job_id = data.get("job_id") - logger.critical(f"[TaskScheduler] Handling cancel action request - task_id: {task_id}, job_id: {job_id}") + logger.debug(f"[TaskScheduler] Handling cancel action request - task_id: {task_id}, job_id: {job_id}") if not task_id and not job_id: logger.error("[TaskScheduler] cancel_action missing both task_id and job_id") @@ -422,49 +434,49 @@ class TaskScheduler: # 通过job_id取消 if job_id: - logger.critical(f"[TaskScheduler] Cancelling job by job_id: {job_id}") + logger.info(f"[TaskScheduler] Cancelling job by job_id: {job_id}") # 设置取消事件 if job_id in self.cancel_events: self.cancel_events[job_id].set() - logger.critical(f"[TaskScheduler] Set cancel event for job_id: {job_id}") + logger.debug(f"[TaskScheduler] Set cancel event for job_id: {job_id}") else: logger.warning(f"[TaskScheduler] Cancel event not found for job_id: {job_id}") # 停止job callback并发送取消状态 if job_id in self.active_jobs: - logger.critical(f"[TaskScheduler] Found active job for cancellation: {job_id}") + logger.debug(f"[TaskScheduler] Found active job for cancellation: {job_id}") # 调用HostNode的cancel_goal host_node = HostNode.get_instance(0) if host_node: host_node.cancel_goal(job_id) - logger.critical(f"[TaskScheduler] Cancelled goal in HostNode for job_id: {job_id}") + logger.info(f"[TaskScheduler] Cancelled goal in HostNode for job_id: {job_id}") else: logger.error(f"[TaskScheduler] HostNode not available for cancel goal: {job_id}") # 停止callback并发送取消状态 await self._stop_job_callback(job_id, "cancelled", "Job was cancelled by user request") - logger.critical(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}") + logger.info(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}") else: logger.warning(f"[TaskScheduler] Job not found in active jobs for cancellation: {job_id}") # 通过task_id取消(需要查找对应的job_id) if task_id and not job_id: - logger.critical(f"[TaskScheduler] Cancelling jobs by task_id: {task_id}") + logger.debug(f"[TaskScheduler] Cancelling jobs by task_id: {task_id}") jobs_to_cancel = [] for jid, job_info in self.active_jobs.items(): if job_info.get("task_id") == task_id: jobs_to_cancel.append(jid) - logger.critical( + logger.debug( f"[TaskScheduler] Found {len(jobs_to_cancel)} jobs to cancel for task_id {task_id}: {jobs_to_cancel}" ) for jid in jobs_to_cancel: - logger.critical(f"[TaskScheduler] Recursively cancelling job_id: {jid} for task_id: {task_id}") + logger.debug(f"[TaskScheduler] Recursively cancelling job_id: {jid} for task_id: {task_id}") # 递归调用自身来取消每个job await self.handle_cancel_action({"job_id": jid}) - logger.critical(f"[TaskScheduler] Completed cancel action handling - task_id: {task_id}, job_id: {job_id}") + logger.debug(f"[TaskScheduler] Completed cancel action handling - task_id: {task_id}, job_id: {job_id}") # job管理方法 async def _start_job_callback( @@ -472,7 +484,7 @@ class TaskScheduler: ) -> None: """启动job的callback定时发送""" if job_id not in self.active_jobs: - logger.warning(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") + logger.debug(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") return # 检查是否已经启动过callback @@ -497,15 +509,13 @@ class TaskScheduler: with self.action_queue_lock: self.action_queue.append(queue_item) else: - logger.warning(f"[TaskScheduler] Action queue not available for job callback: {job_id}") + logger.debug(f"[TaskScheduler] Action queue not available for job callback: {job_id}") async def _stop_job_callback(self, job_id: str, final_status: str, return_info: Optional[str] = None) -> None: """停止job的callback定时发送并发送最终结果""" - logger.critical( - f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}" - ) + logger.info(f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}") if job_id not in self.active_jobs: - logger.warning(f"[TaskScheduler] Job {job_id} not found in active jobs when stopping callback") + logger.debug(f"[TaskScheduler] Job {job_id} not found in active jobs when stopping callback") return job_info = self.active_jobs[job_id] @@ -514,22 +524,20 @@ class TaskScheduler: task_id = job_info["task_id"] device_action_key = job_info["device_action_key"] - logger.critical( + logger.debug( f"[TaskScheduler] Job {job_id} details - device: {device_id}, action: {action_name}, task: {task_id}" ) # 移除活跃任务和取消事件(这会让队列处理器自动停止callback) self.active_jobs.pop(job_id, None) self.cancel_events.pop(job_id, None) - logger.critical(f"[TaskScheduler] Removed job {job_id} from active jobs and cancel events") + logger.debug(f"[TaskScheduler] Removed job {job_id} from active jobs and cancel events") # 发送最终的callback状态 await self._publish_device_action_state( device_id, action_name, task_id, job_id, "job_call_back_status", True, 0 ) - logger.critical( - f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}" - ) + logger.debug(f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}") # 外部接口方法 def publish_job_status( @@ -537,7 +545,7 @@ class TaskScheduler: ) -> None: """发布作业状态,拦截最终结果(给HostNode调用的接口)""" if not self.message_sender.is_connected(): - logger.warning(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") + logger.debug(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") return # 拦截最终结果状态 @@ -545,13 +553,16 @@ class TaskScheduler: host_node = HostNode.get_instance(0) if host_node: host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id) - logger.critical(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") + logger.info(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") + # 给其他同名action至少执行一次的机会 + with self.immediate_execution_lock: + self.immediate_execution_flags[item.device_action_key] = time.time() + 3 # 如果是最终状态,通过_stop_job_callback处理 if self.message_sender.event_loop: - asyncio.run_coroutine_threadsafe(self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop).result() - logger.critical(f"[TaskScheduler] Scheduled final callback stop for job_id: {item.job_id}") - logger.critical(f"[TaskScheduler] Intercepted final job status: {item.job_id} - {status}") - + asyncio.run_coroutine_threadsafe( + self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop + ).result() + # 执行结果信息上传 message = { "action": "job_status", "data": { @@ -570,24 +581,24 @@ class TaskScheduler: loop.create_task(self.message_sender.send_message(message)) except RuntimeError: asyncio.run(self.message_sender.send_message(message)) - logger.critical(f"[TaskScheduler] Executed message send for job_id: {item.job_id} - {status}") + logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore def cancel_goal(self, job_id: str) -> None: """取消指定的任务(给外部调用的接口)""" - logger.critical(f"[TaskScheduler] External cancel request for job_id: {job_id}") + logger.debug(f"[TaskScheduler] External cancel request for job_id: {job_id}") if job_id in self.cancel_events: - logger.critical(f"[TaskScheduler] Found cancel event for job_id: {job_id}, processing cancellation") + logger.debug(f"[TaskScheduler] Found cancel event for job_id: {job_id}, processing cancellation") try: loop = asyncio.get_event_loop() loop.create_task(self.handle_cancel_action({"job_id": job_id})) - logger.critical(f"[TaskScheduler] Scheduled cancel action for job_id: {job_id}") + logger.debug(f"[TaskScheduler] Scheduled cancel action for job_id: {job_id}") except RuntimeError: asyncio.run(self.handle_cancel_action({"job_id": job_id})) - logger.critical(f"[TaskScheduler] Executed cancel action for job_id: {job_id}") - logger.critical(f"[TaskScheduler] Initiated cancel for job_id: {job_id}") + logger.debug(f"[TaskScheduler] Executed cancel action for job_id: {job_id}") + logger.debug(f"[TaskScheduler] Initiated cancel for job_id: {job_id}") else: - logger.warning(f"[TaskScheduler] Job {job_id} not found in cancel events for cancellation") + logger.debug(f"[TaskScheduler] Job {job_id} not found in cancel events for cancellation") class WebSocketClient(BaseCommunicationClient): @@ -865,7 +876,7 @@ class WebSocketClient(BaseCommunicationClient): if self.task_scheduler: self.task_scheduler.publish_job_status(feedback_data, item, status, return_info) else: - logger.warning(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") + logger.debug(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") def send_ping(self, ping_id: str, timestamp: float) -> None: """发送ping消息""" @@ -879,9 +890,9 @@ class WebSocketClient(BaseCommunicationClient): def cancel_goal(self, job_id: str) -> None: """取消指定的任务(转发给TaskScheduler)""" - logger.critical(f"[WebSocket] Received cancel goal request for job_id: {job_id}") + logger.debug(f"[WebSocket] Received cancel goal request for job_id: {job_id}") if self.task_scheduler: self.task_scheduler.cancel_goal(job_id) - logger.critical(f"[WebSocket] Forwarded cancel goal to TaskScheduler for job_id: {job_id}") + logger.debug(f"[WebSocket] Forwarded cancel goal to TaskScheduler for job_id: {job_id}") else: - logger.warning(f"[WebSocket] Task scheduler not available for cancel goal: {job_id}") + logger.debug(f"[WebSocket] Task scheduler not available for cancel goal: {job_id}")