diff --git a/unilabos/app/model.py b/unilabos/app/model.py index 48b3a689..a7c199c9 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -53,6 +53,7 @@ class JobAddReq(BaseModel): action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="") action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="") action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="") + task_id: str = Field(examples=["task_id"], description="task uuid") job_id: str = Field(examples=["job_id"], description="goal uuid") node_id: str = Field(examples=["node_id"], description="node uuid") server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info") diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index 621a2138..f48b74b9 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -60,8 +60,6 @@ class TaskScheduler: self.active_jobs = {} # job_id -> 任务信息 self.cancel_events = {} # job_id -> asyncio.Event for cancellation - self.just_free_sets = {} # device_name + action_name -> end_timestamp - # 队列处理器 self.queue_processor_thread = None self.queue_running = False @@ -144,15 +142,15 @@ class TaskScheduler: # 如果需要继续,放入重新排队列表 if should_continue: - item.next_run_time = current_time + 10.0 # 10秒后再次执行 + item.next_run_time = current_time + 10 # 10秒后再次执行 item.retry_count += 1 items_to_requeue.append(item) logger.critical( - f"[TaskScheduler] Re-queued {item.task_type} for {item.device_action_key}" + f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} for {item.device_action_key}" ) else: - logger.critical( - f"[TaskScheduler] Completed {item.task_type} for {item.device_action_key}" + logger.warning( + f"[TaskScheduler] Completed {item.job_id} {item.task_type} for {item.device_action_key}" ) except Exception as e: @@ -192,21 +190,16 @@ class TaskScheduler: # 发送状态报告 if free: # 设备空闲,发送最终状态并停止 + # 下面要增加和handle_query_state相同的逻辑 + host_node._device_action_status[item.device_action_key].job_ids[item.job_id] = time.time() await self._publish_device_action_state( - item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", True, 0.0 + item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", True, 0 ) - self.just_free_sets[item.device_action_key] = time.time() + 30.0 return False # 停止继续监控 else: - if item.device_action_key in self.just_free_sets: - if time.time() < self.just_free_sets[item.device_action_key]: - return True # 继续监控 - else: - del self.just_free_sets[item.device_action_key] - # 设备忙碌,发送状态并继续监控 await self._publish_device_action_state( - item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", False, 10.0 + item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", False, 10 ) return True # 继续监控 @@ -217,7 +210,6 @@ class TaskScheduler: async def _process_job_callback_item(self, item: QueueItem) -> bool: """处理job_call_back_status类型的队列项,返回True表示需要继续,False表示可以停止""" try: - logger.critical(f"[TaskScheduler] Processing job callback item for job_id: {item.job_id}") # 检查任务是否还在活跃列表中 if item.job_id not in self.active_jobs: logger.critical(f"[TaskScheduler] Job {item.job_id} no longer active") @@ -239,21 +231,15 @@ class TaskScheduler: action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) free = not bool(action_jobs) - logger.critical( - f"[TaskScheduler] Job {item.job_id} callback status check - free: {free}, action_jobs: {action_jobs}" - ) - # 发送job_call_back_status状态 await self._publish_device_action_state( - item.device_id, item.action_name, item.task_id, item.job_id, "job_call_back_status", free, 10.0 + item.device_id, item.action_name, item.task_id, item.job_id, "job_call_back_status", free, 10 ) # 如果任务完成,停止监控 if free: - logger.critical(f"[TaskScheduler] Job {item.job_id} callback monitoring completed - device is free") return False else: - logger.critical(f"[TaskScheduler] Job {item.job_id} callback monitoring continues - device is busy") return True # 继续监控 except Exception as e: @@ -262,13 +248,13 @@ class TaskScheduler: # 消息发送方法 async def _publish_device_action_state( - self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: float + self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int ) -> None: """发布设备动作状态""" message = { "action": "report_action_state", "data": { - "action": typ, + "type": typ, "device_id": device_id, "action_name": action_name, "task_id": task_id, @@ -278,36 +264,6 @@ class TaskScheduler: }, } await self.message_sender.send_message(message) - logger.critical(f"[TaskScheduler] Published action state: {device_id}/{action_name} - {typ}") - - async def _publish_final_job_status( - self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[str] = None - ) -> None: - """发布最终作业状态""" - if not self.message_sender.is_connected(): - logger.warning("[TaskScheduler] Not connected, cannot publish final job status") - return - - # 只处理最终状态 - if status not in ["completed", "failed", "cancelled"]: - logger.critical(f"[TaskScheduler] Ignoring non-final status: {status}") - return - - message = { - "action": "job_status", - "data": { - "job_id": item.job_id, - "task_id": item.task_id, - "device_id": item.device_id, - "action_name": item.action_name, - "status": status, - "feedback_data": feedback_data, - "return_info": return_info, - "timestamp": time.time(), - }, - } - await self.message_sender.send_message(message) - logger.critical(f"[TaskScheduler] Final job status published: {item.job_id} - {status}") # 业务逻辑处理方法 async def handle_query_state(self, data: Dict[str, str]) -> None: @@ -341,9 +297,14 @@ class TaskScheduler: # 如果设备空闲,立即响应free状态 if free: await self._publish_device_action_state( - device_id, action_name, task_id, job_id, "query_action_status", True, 0.0 + device_id, action_name, task_id, job_id, "query_action_status", True, 0 ) - logger.critical(f"[TaskScheduler] Device {device_id}/{action_name} is free, responded immediately") + logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately") + host_node = HostNode.get_instance(0) + if not host_node: + logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}") + return + host_node._device_action_status[device_action_key].job_ids[job_id] = time.time() return # 设备忙碌时,检查是否已有相同的轮询任务 @@ -371,14 +332,14 @@ class TaskScheduler: task_id=task_id, job_id=job_id, device_action_key=device_action_key, - next_run_time=time.time() + 10.0, # 10秒后执行 + next_run_time=time.time() + 10, # 10秒后执行 ) self.action_queue.append(queue_item) - logger.critical(f"[TaskScheduler] Device {device_id}/{action_name} is busy, added to polling queue") + logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, added to polling queue {action_jobs}") # 立即发送busy状态 await self._publish_device_action_state( - device_id, action_name, task_id, job_id, "query_action_status", False, 10.0 + device_id, action_name, task_id, job_id, "query_action_status", False, 10 ) else: logger.warning("[TaskScheduler] Action queue not available") @@ -404,22 +365,13 @@ class TaskScheduler: "callback_started": False, # 标记callback是否已启动 } - # 创建取消事件 + # 创建取消事件,todo:要移动到query_state中 self.cancel_events[req.job_id] = asyncio.Event() - logger.critical(f"[TaskScheduler] Created cancel event for job_id: {req.job_id}") try: - host_node = HostNode.get_instance(0) - if not host_node: - logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}") - return - - host_node._device_action_status[device_action_key].job_ids[req.job_id] = time.time() - logger.critical(f"[TaskScheduler] Job registered in HostNode: {req.job_id}") - # 启动callback定时发送 await self._start_job_callback( - req.job_id, req.device_id, req.action, data.get("task_id", ""), device_action_key + req.job_id, req.device_id, req.action, req.task_id, device_action_key ) # 创建兼容HostNode的QueueItem对象 @@ -427,20 +379,21 @@ class TaskScheduler: task_type="job_call_back_status", device_id=req.device_id, action_name=req.action, - task_id=data.get("task_id", ""), + task_id=req.task_id, job_id=req.job_id, device_action_key=device_action_key, next_run_time=time.time(), ) - - logger.critical(f"[TaskScheduler] Sending goal to HostNode for job_id: {req.job_id}") + host_node = HostNode.get_instance(0) + if not host_node: + logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}") + return host_node.send_goal( job_queue_item, action_type=req.action_type, action_kwargs=req.action_args, server_info=req.server_info, ) - logger.critical(f"[TaskScheduler] Goal sent successfully for job_id: {req.job_id}") except Exception as e: logger.error(f"[TaskScheduler] Exception during job start for job_id {req.job_id}: {str(e)}") traceback.print_exc() @@ -518,7 +471,6 @@ class TaskScheduler: self, job_id: str, device_id: str, action_name: str, task_id: str, device_action_key: str ) -> None: """启动job的callback定时发送""" - logger.critical(f"[TaskScheduler] Starting job callback for job_id: {job_id}") if job_id not in self.active_jobs: logger.warning(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") return @@ -530,7 +482,6 @@ class TaskScheduler: # 标记callback已启动 self.active_jobs[job_id]["callback_started"] = True - logger.critical(f"[TaskScheduler] Marked callback as started for job_id: {job_id}") # 将job_call_back_status任务放入队列 queue_item = QueueItem( @@ -540,12 +491,11 @@ class TaskScheduler: task_id=task_id, job_id=job_id, device_action_key=device_action_key, - next_run_time=time.time() + 10.0, # 10秒后开始报送 + next_run_time=time.time() + 10, # 10秒后开始报送 ) if self.action_queue is not None: with self.action_queue_lock: self.action_queue.append(queue_item) - logger.critical(f"[TaskScheduler] Added job callback to queue for job_id: {job_id}") else: logger.warning(f"[TaskScheduler] Action queue not available for job callback: {job_id}") @@ -575,23 +525,8 @@ class TaskScheduler: # 发送最终的callback状态 await self._publish_device_action_state( - device_id, action_name, task_id, job_id, "job_call_back_status", True, 0.0 + device_id, action_name, task_id, job_id, "job_call_back_status", True, 0 ) - logger.critical(f"[TaskScheduler] Published final callback status for job_id: {job_id}") - - # 发送最终的job状态 - error_queue_item = QueueItem( - task_type="job_call_back_status", - device_id=device_id, - action_name=action_name, - task_id=task_id, - job_id=job_id, - device_action_key=device_action_key, - next_run_time=time.time(), - ) - await self._publish_final_job_status({}, error_queue_item, final_status, return_info) - logger.critical(f"[TaskScheduler] Published final job status for job_id: {job_id}") - logger.critical( f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}" ) @@ -601,28 +536,22 @@ class TaskScheduler: self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None ) -> None: """发布作业状态,拦截最终结果(给HostNode调用的接口)""" - logger.critical(f"[TaskScheduler] Publishing job status for job_id: {item.job_id} - status: {status}") if not self.message_sender.is_connected(): logger.warning(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") return # 拦截最终结果状态 - if status in ["completed", "failed"]: + if status in ["success", "failed"]: + host_node = HostNode.get_instance(0) + if host_node: + host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id) logger.critical(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") # 如果是最终状态,通过_stop_job_callback处理 - try: - loop = asyncio.get_event_loop() - loop.create_task(self._stop_job_callback(item.job_id, status, return_info)) - logger.critical(f"[TaskScheduler] Scheduled final callback stop for job_id: {item.job_id}") - except RuntimeError: - # 如果没有运行的事件循环,创建一个 - asyncio.run(self._stop_job_callback(item.job_id, status, return_info)) - logger.critical(f"[TaskScheduler] Executed final callback stop for job_id: {item.job_id}") + if self.message_sender.event_loop: + asyncio.run_coroutine_threadsafe(self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop).result() + logger.critical(f"[TaskScheduler] Scheduled final callback stop for job_id: {item.job_id}") logger.critical(f"[TaskScheduler] Intercepted final job status: {item.job_id} - {status}") - return - # 对于running状态,正常发布 - logger.critical(f"[TaskScheduler] Publishing running status for job_id: {item.job_id} - {status}") message = { "action": "job_status", "data": { @@ -639,7 +568,6 @@ class TaskScheduler: try: loop = asyncio.get_event_loop() loop.create_task(self.message_sender.send_message(message)) - logger.critical(f"[TaskScheduler] Scheduled message send for job_id: {item.job_id} - {status}") except RuntimeError: asyncio.run(self.message_sender.send_message(message)) logger.critical(f"[TaskScheduler] Executed message send for job_id: {item.job_id} - {status}") @@ -717,9 +645,9 @@ class WebSocketClient(BaseCommunicationClient): else: scheme = "ws" if ":" in parsed.netloc and parsed.port is not None: - self.websocket_url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/lab" + self.websocket_url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/schedule" else: - self.websocket_url = f"{scheme}://{parsed.netloc}/api/v1/ws/lab" + self.websocket_url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule" logger.debug(f"[WebSocket] URL: {self.websocket_url}") # 连接管理方法 @@ -934,10 +862,8 @@ class WebSocketClient(BaseCommunicationClient): self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None ) -> None: """发布作业状态(转发给TaskScheduler)""" - logger.critical(f"[WebSocket] Received job status for job_id: {item.job_id} - status: {status}") if self.task_scheduler: self.task_scheduler.publish_job_status(feedback_data, item, status, return_info) - logger.critical(f"[WebSocket] Forwarded job status to TaskScheduler for job_id: {item.job_id}") else: logger.warning(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index eed9f420..a39dfa30 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -233,12 +233,12 @@ class HostNode(BaseROS2DeviceNode): client: HTTPClient = bridge resource_start_time = time.time() - # resource_add_res = client.resource_add(add_schema(resource_with_parent_name), False) + resource_add_res = client.resource_add(add_schema(resource_with_parent_name), False) resource_end_time = time.time() self.lab_logger().info( f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" ) - # resource_add_res = client.resource_edge_add(self.resources_edge_config, False) + resource_add_res = client.resource_edge_add(self.resources_edge_config, False) resource_edge_end_time = time.time() self.lab_logger().info( f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms" @@ -707,7 +707,6 @@ class HostNode(BaseROS2DeviceNode): def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None: """获取结果回调""" job_id = item.job_id - self._device_action_status[f"/devices/{item.device_id}/{item.action_name}"].job_ids.pop(item.job_id) result_msg = future.result().result result_data = convert_from_ros_msg(result_msg) status = "success" @@ -738,16 +737,6 @@ class HostNode(BaseROS2DeviceNode): for bridge in self.bridges: if hasattr(bridge, "publish_job_status"): bridge.publish_job_status(result_data, item, status, return_info_str) - # 如果是WebSocket客户端,通知任务完成 - if hasattr(bridge, "_finish_job_callback_status"): - import asyncio - - free = True # 任务完成,设备空闲 - need_more = 0.0 # 任务结束,不需要更多时间 - try: - asyncio.create_task(bridge._finish_job_callback_status(job_id, free, need_more)) - except Exception as e: - self.lab_logger().error(f"[Host Node] Error finishing job callback status: {e}") def cancel_goal(self, goal_uuid: str) -> None: """取消目标"""