From 779c9693d9988f86a6b406da966564aab7096426 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Tue, 16 Sep 2025 05:24:42 +0800 Subject: [PATCH] refactor ws client --- unilabos/app/ws_client.py | 1728 +++++++++++++++++++------------------ 1 file changed, 871 insertions(+), 857 deletions(-) diff --git a/unilabos/app/ws_client.py b/unilabos/app/ws_client.py index f4092cc8..0e5e8ec9 100644 --- a/unilabos/app/ws_client.py +++ b/unilabos/app/ws_client.py @@ -1,10 +1,11 @@ #!/usr/bin/env python # coding=utf-8 """ -WebSocket通信客户端和任务调度器 +WebSocket通信客户端重构版本 v2 -基于WebSocket协议的通信客户端实现,继承自BaseCommunicationClient。 -包含WebSocketClient(连接管理)和TaskScheduler(任务调度)两个类。 +基于两线程架构的WebSocket客户端实现: +1. 消息处理线程 - 处理WebSocket消息,划分任务执行和任务队列 +2. 队列处理线程 - 定时给发送队列推送消息,管理任务状态 """ import json @@ -16,9 +17,11 @@ import asyncio import traceback import websockets import ssl as ssl_module -from dataclasses import dataclass -from typing import Optional, Dict, Any +from queue import Queue, Empty +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, Callable, List, Set from urllib.parse import urlparse +from enum import Enum from unilabos.app.model import JobAddReq from unilabos.ros.nodes.presets.host_node import HostNode from unilabos.utils.type_check import serialize_result_info @@ -27,6 +30,15 @@ from unilabos.config.config import WSConfig, HTTPConfig, BasicConfig from unilabos.utils import logger +class JobStatus(Enum): + """任务状态枚举""" + + QUEUE = "queue" # 排队中 + READY = "ready" # 已获得执行许可,等待开始 + STARTED = "started" # 执行中 + ENDED = "ended" # 已结束 + + @dataclass class QueueItem: """队列项数据结构""" @@ -37,236 +49,651 @@ class QueueItem: task_id: str job_id: str device_action_key: str - next_run_time: float # 下次执行时间戳 + next_run_time: float = 0 # 下次执行时间戳 retry_count: int = 0 # 重试次数 -class TaskScheduler: - """ - 任务调度器类 +@dataclass +class JobInfo: + """任务信息数据结构""" - 负责任务队列管理、状态跟踪、业务逻辑处理等功能。 - """ + job_id: str + task_id: str + device_id: str + action_name: str + device_action_key: str + status: JobStatus + start_time: float + last_update_time: float = field(default_factory=time.time) - def __init__(self, message_sender: "WebSocketClient"): - """初始化任务调度器""" - self.message_sender = message_sender + def update_timestamp(self): + """更新最后更新时间""" + self.last_update_time = time.time() - # 队列管理 - self.action_queue = [] # 任务队列 - self.action_queue_lock = threading.Lock() # 队列锁 - # 任务状态跟踪 - self.active_jobs = {} # job_id -> 任务信息 - self.cancel_events = {} # job_id -> asyncio.Event for cancellation +@dataclass +class WebSocketMessage: + """WebSocket消息数据结构""" - # 立即执行标记字典 - device_id+action_name -> timestamp - self.immediate_execution_flags = {} # 存储需要立即执行的设备动作组合 - self.immediate_execution_lock = threading.Lock() # 立即执行标记锁 + action: str + data: Dict[str, Any] + timestamp: float = field(default_factory=time.time) - # 队列处理器 - self.queue_processor_thread = None - self.queue_running = False - # 队列处理器相关方法 +class DeviceActionManager: + """设备动作管理器 - 管理每个device_action_key的任务队列""" + + def __init__(self): + self.device_queues: Dict[str, List[JobInfo]] = {} # device_action_key -> job queue + self.active_jobs: Dict[str, JobInfo] = {} # device_action_key -> active job + self.all_jobs: Dict[str, JobInfo] = {} # job_id -> job_info + self.lock = threading.RLock() + + def add_queue_request(self, job_info: JobInfo) -> bool: + """ + 添加队列请求 + 返回True表示可以立即执行(free),False表示需要排队(busy) + """ + with self.lock: + device_key = job_info.device_action_key + + # 总是将job添加到all_jobs中 + self.all_jobs[job_info.job_id] = job_info + + # 检查是否有正在执行或准备执行的任务 + if device_key in self.active_jobs: + # 有正在执行或准备执行的任务,加入队列 + if device_key not in self.device_queues: + self.device_queues[device_key] = [] + job_info.status = JobStatus.QUEUE + self.device_queues[device_key].append(job_info) + logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} queued for {device_key}") + return False + + # 检查是否有排队的任务 + if device_key in self.device_queues and self.device_queues[device_key]: + # 有排队的任务,加入队列末尾 + job_info.status = JobStatus.QUEUE + self.device_queues[device_key].append(job_info) + logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} queued for {device_key}") + return False + + # 没有正在执行或排队的任务,可以立即执行 + # 将其状态设为READY并占位,防止后续job也被判断为free + job_info.status = JobStatus.READY + job_info.update_timestamp() + self.active_jobs[device_key] = job_info + logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} can start immediately for {device_key}") + return True + + def start_job(self, job_id: str) -> bool: + """ + 开始执行任务 + 返回True表示成功开始,False表示失败 + """ + with self.lock: + if job_id not in self.all_jobs: + logger.error(f"[DeviceActionManager] Job {job_id[:4]} not found for start") + return False + + job_info = self.all_jobs[job_id] + device_key = job_info.device_action_key + + # 检查job的状态是否正确 + if job_info.status != JobStatus.READY: + logger.error( + f"[DeviceActionManager] Job {job_id[:4]} is not in READY status, current: {job_info.status}" + ) + return False + + # 检查设备上是否是这个job + if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: + logger.error(f"[DeviceActionManager] Job {job_id[:4]} is not the active job for {device_key}") + return False + + # 开始执行任务,将状态从READY转换为STARTED + job_info.status = JobStatus.STARTED + job_info.update_timestamp() + + logger.info(f"[DeviceActionManager] Job {job_id[:4]} started for {device_key}") + return True + + def end_job(self, job_id: str) -> Optional[JobInfo]: + """ + 结束任务,返回下一个可以执行的任务(如果有的话) + """ + with self.lock: + if job_id not in self.all_jobs: + logger.warning(f"[DeviceActionManager] Job {job_id[:4]} not found for end") + return None + + job_info = self.all_jobs[job_id] + device_key = job_info.device_action_key + + # 移除活跃任务 + if device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id: + del self.active_jobs[device_key] + job_info.status = JobStatus.ENDED + job_info.update_timestamp() + # 从all_jobs中移除已结束的job + del self.all_jobs[job_id] + logger.info(f"[DeviceActionManager] Job {job_id[:4]} ended for {device_key}") + else: + logger.warning(f"[DeviceActionManager] Job {job_id[:4]} was not active for {device_key}") + + # 检查队列中是否有等待的任务 + if device_key in self.device_queues and self.device_queues[device_key]: + next_job = self.device_queues[device_key].pop(0) # FIFO + # 将下一个job设置为READY状态并放入active_jobs + next_job.status = JobStatus.READY + next_job.update_timestamp() + self.active_jobs[device_key] = next_job + logger.info(f"[DeviceActionManager] Next job {next_job.job_id[:4]} can start for {device_key}") + return next_job + + return None + + def get_active_jobs(self) -> List[JobInfo]: + """获取所有正在执行的任务""" + with self.lock: + return list(self.active_jobs.values()) + + def get_queued_jobs(self) -> List[JobInfo]: + """获取所有排队中的任务""" + with self.lock: + queued = [] + for queue in self.device_queues.values(): + queued.extend(queue) + return queued + + def get_job_info(self, job_id: str) -> Optional[JobInfo]: + """获取任务信息""" + with self.lock: + return self.all_jobs.get(job_id) + + def cancel_job(self, job_id: str) -> bool: + """取消单个任务""" + with self.lock: + if job_id not in self.all_jobs: + logger.warning(f"[DeviceActionManager] Job {job_id[:4]} not found for cancel") + return False + + job_info = self.all_jobs[job_id] + device_key = job_info.device_action_key + + # 如果是正在执行的任务 + if ( + device_key in self.active_jobs and self.active_jobs[device_key].job_id == job_id + ): # 后面需要和cancel_goal进行联动,而不是在这里进行处理,现在默认等待这个job结束 + # del self.active_jobs[device_key] + # job_info.status = JobStatus.ENDED + # # 从all_jobs中移除 + # del self.all_jobs[job_id] + # logger.info(f"[DeviceActionManager] Active job {job_id[:4]} cancelled for {device_key}") + + # # 启动下一个任务 + # if device_key in self.device_queues and self.device_queues[device_key]: + # next_job = self.device_queues[device_key].pop(0) + # # 将下一个job设置为READY状态并放入active_jobs + # next_job.status = JobStatus.READY + # next_job.update_timestamp() + # self.active_jobs[device_key] = next_job + # logger.info(f"[DeviceActionManager] Next job {next_job.job_id[:4]} can start after cancel") + # return True + pass + + # 如果是排队中的任务 + elif device_key in self.device_queues: + original_length = len(self.device_queues[device_key]) + self.device_queues[device_key] = [j for j in self.device_queues[device_key] if j.job_id != job_id] + if len(self.device_queues[device_key]) < original_length: + job_info.status = JobStatus.ENDED + # 从all_jobs中移除 + del self.all_jobs[job_id] + logger.info(f"[DeviceActionManager] Queued job {job_id[:4]} cancelled for {device_key}") + return True + + logger.warning(f"[DeviceActionManager] Job {job_id[:4]} not found in active or queued jobs") + return False + + def cancel_jobs_by_task_id(self, task_id: str) -> List[str]: + """按task_id取消所有相关任务,返回被取消的job_id列表""" + cancelled_job_ids = [] + + # 首先找到所有属于该task_id的job + jobs_to_cancel = [] + with self.lock: + jobs_to_cancel = [job_info for job_info in self.all_jobs.values() if job_info.task_id == task_id] + + if not jobs_to_cancel: + logger.warning(f"[DeviceActionManager] No jobs found for task_id: {task_id}") + return cancelled_job_ids + + logger.info(f"[DeviceActionManager] Found {len(jobs_to_cancel)} jobs to cancel for task_id: {task_id}") + + # 逐个取消job + for job_info in jobs_to_cancel: + if self.cancel_job(job_info.job_id): + cancelled_job_ids.append(job_info.job_id) + + logger.info( + f"[DeviceActionManager] Successfully cancelled {len(cancelled_job_ids)} " f"jobs for task_id: {task_id}" + ) + + return cancelled_job_ids + + +class MessageProcessor: + """消息处理线程 - 处理WebSocket消息,划分任务执行和任务队列""" + + def __init__(self, websocket_url: str, send_queue: Queue, device_manager: DeviceActionManager): + self.websocket_url = websocket_url + self.send_queue = send_queue + self.device_manager = device_manager + self.queue_processor = None # 延迟设置 + self.websocket_client = None # 延迟设置 + + # WebSocket连接 + self.websocket = None + self.connected = False + + # 线程控制 + self.is_running = False + self.thread = None + self.reconnect_count = 0 + + logger.info(f"[MessageProcessor] Initialized for URL: {websocket_url}") + + def set_queue_processor(self, queue_processor: "QueueProcessor"): + """设置队列处理器引用""" + self.queue_processor = queue_processor + + def set_websocket_client(self, websocket_client: "WebSocketClient"): + """设置WebSocket客户端引用""" + self.websocket_client = websocket_client + def start(self) -> None: - """启动任务调度器""" - if self.queue_running: - logger.warning("[TaskScheduler] Already running") + """启动消息处理线程""" + if self.is_running: + logger.warning("[MessageProcessor] Already running") return - self.queue_running = True - self.queue_processor_thread = threading.Thread( - target=self._run_queue_processor, daemon=True, name="TaskScheduler" - ) - self.queue_processor_thread.start() + self.is_running = True + self.thread = threading.Thread(target=self._run, daemon=True, name="MessageProcessor") + self.thread.start() + logger.info("[MessageProcessor] Started") def stop(self) -> None: - """停止任务调度器""" - self.queue_running = False - if self.queue_processor_thread and self.queue_processor_thread.is_alive(): - self.queue_processor_thread.join(timeout=5) - logger.info("[TaskScheduler] Stopped") + """停止消息处理线程""" + self.is_running = False + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=5) + logger.info("[MessageProcessor] Stopped") - def _run_queue_processor(self): - """在独立线程中运行队列处理器""" + def _run(self): + """运行消息处理主循环""" loop = asyncio.new_event_loop() try: asyncio.set_event_loop(loop) - loop.run_until_complete(self._action_queue_processor()) + loop.run_until_complete(self._connection_handler()) except Exception as e: - logger.error(f"[TaskScheduler] Queue processor thread error: {str(e)}") + logger.error(f"[MessageProcessor] Thread error: {str(e)}") + logger.error(traceback.format_exc()) finally: if loop: loop.close() - async def _action_queue_processor(self) -> None: - """队列处理器 - 从队列头部取出任务处理,保持顺序,使用list避免队尾排队问题""" - logger.info("[TaskScheduler] Action queue processor started") + async def _connection_handler(self): + """处理WebSocket连接和重连逻辑""" + while self.is_running: + try: + # 构建SSL上下文 + ssl_context = None + if self.websocket_url.startswith("wss://"): + ssl_context = ssl_module.create_default_context() + + ws_logger = logging.getLogger("websockets.client") + ws_logger.setLevel(logging.INFO) + + async with websockets.connect( + self.websocket_url, + ssl=ssl_context, + ping_interval=WSConfig.ping_interval, + ping_timeout=10, + additional_headers={"Authorization": f"Lab {BasicConfig.auth_secret()}"}, + logger=ws_logger, + ) as websocket: + self.websocket = websocket + self.connected = True + self.reconnect_count = 0 + + logger.info(f"[MessageProcessor] Connected to {self.websocket_url}") + + # 启动发送协程 + send_task = asyncio.create_task(self._send_handler()) + + try: + # 接收消息循环 + await self._message_handler() + finally: + send_task.cancel() + try: + await send_task + except asyncio.CancelledError: + pass + self.connected = False + + except websockets.exceptions.ConnectionClosed: + logger.warning("[MessageProcessor] Connection closed") + self.connected = False + except Exception as e: + logger.error(f"[MessageProcessor] Connection error: {str(e)}") + logger.error(traceback.format_exc()) + self.connected = False + finally: + self.websocket = None + + # 重连逻辑 + if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts: + self.reconnect_count += 1 + logger.info( + f"[MessageProcessor] Reconnecting in {WSConfig.reconnect_interval}s " + f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" + ) + await asyncio.sleep(WSConfig.reconnect_interval) + elif self.reconnect_count >= WSConfig.max_reconnect_attempts: + logger.error("[MessageProcessor] Max reconnection attempts reached") + break + else: + self.reconnect_count -= 1 + + async def _message_handler(self): + """处理接收到的消息""" + if not self.websocket: + logger.error("[MessageProcessor] WebSocket connection is None") + return try: - while self.queue_running: + async for message in self.websocket: try: - current_time = time.time() - items_to_process = [] - items_to_requeue = [] + data = json.loads(message) + await self._process_message(data) + except json.JSONDecodeError: + logger.error(f"[MessageProcessor] Invalid JSON received: {message}") + except Exception as e: + logger.error(f"[MessageProcessor] Error processing message: {str(e)}") + logger.error(traceback.format_exc()) - # 使用锁安全地复制队列内容 - with self.action_queue_lock: - if not self.action_queue: - # 队列为空,等待一段时间 - pass - else: - # 复制队列内容以避免并发修改问题 - items_to_process = self.action_queue.copy() - self.action_queue.clear() + except websockets.exceptions.ConnectionClosed: + logger.info("[MessageProcessor] Message handler stopped - connection closed") + except Exception as e: + logger.error(f"[MessageProcessor] Message handler error: {str(e)}") + logger.error(traceback.format_exc()) - if not items_to_process: - await asyncio.sleep(0.2) # 队列为空时等待 + async def _send_handler(self): + """处理发送队列中的消息""" + logger.debug("[MessageProcessor] Send handler started") + + try: + while self.connected and self.websocket: + try: + # 从发送队列获取消息(非阻塞) + messages_to_send = [] + max_batch = 10 + + while len(messages_to_send) < max_batch: + try: + message = self.send_queue.get_nowait() + messages_to_send.append(message) + except Empty: + break + + if not messages_to_send: + await asyncio.sleep(0.1) continue - with self.immediate_execution_lock: - expired_keys = [k for k, v in self.immediate_execution_flags.items() if current_time > v] - for k in expired_keys: - del self.immediate_execution_flags[k] - immediate_execution = self.immediate_execution_flags.copy() - # 处理每个任务 - for item in items_to_process: + # 批量发送消息 + for msg in messages_to_send: + if not self.connected or not self.websocket: + break + try: - # 检查是否到了执行时间,是我们本地的执行时间,按顺序填入 - if current_time < item.next_run_time and item.device_action_key not in immediate_execution: - # 还没到执行时间,保留在队列中(保持原有顺序) - items_to_requeue.append(item) - continue - - # 执行相应的任务 - should_continue = False - if item.task_type == "query_action_status": - should_continue = asyncio.run_coroutine_threadsafe( - self._process_query_status_item(item), self.message_sender.event_loop - ).result() - elif item.task_type == "job_call_back_status": - should_continue = asyncio.run_coroutine_threadsafe( - self._process_job_callback_item(item), self.message_sender.event_loop - ).result() - else: - logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}") - continue - - # 如果需要继续,放入重新排队列表 - if should_continue: - item.next_run_time = current_time + 10 # 10秒后再次执行 - item.retry_count += 1 - items_to_requeue.append(item) - logger.trace( # type: ignore - f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} " - f"for {item.device_action_key}" - ) - else: - logger.debug( - f"[TaskScheduler] Completed {item.job_id} {item.task_type} " - f"for {item.device_action_key}" - ) - + message_str = json.dumps(msg, ensure_ascii=False) + await self.websocket.send(message_str) + logger.trace(f"[MessageProcessor] Message sent: {msg.get('action', 'unknown')}") # type: ignore # noqa: E501 except Exception as e: - logger.error(f"[TaskScheduler] Error processing item {item.task_type}: {str(e)}") + logger.error(f"[MessageProcessor] Failed to send message: {str(e)}") + logger.error(traceback.format_exc()) + break - # 将需要重新排队的任务放回队列开头(保持原有顺序,确保优先于新任务执行) - if items_to_requeue and self.action_queue is not None: - with self.action_queue_lock: - self.action_queue = items_to_requeue + self.action_queue - - await asyncio.sleep(0.1) # 短暂等待避免过度占用CPU + # 批量发送后短暂等待 + if len(messages_to_send) > 5: + await asyncio.sleep(0.001) except Exception as e: - logger.error(f"[TaskScheduler] Error in queue processor: {str(e)}") - await asyncio.sleep(1) # 错误后稍等再继续 + logger.error(f"[MessageProcessor] Error in send handler: {str(e)}") + logger.error(traceback.format_exc()) + await asyncio.sleep(0.1) except asyncio.CancelledError: - logger.info("[TaskScheduler] Action queue processor cancelled") + logger.debug("[MessageProcessor] Send handler cancelled") except Exception as e: - logger.error(f"[TaskScheduler] Fatal error in queue processor: {str(e)}") + logger.error(f"[MessageProcessor] Fatal error in send handler: {str(e)}") + logger.error(traceback.format_exc()) finally: - logger.info("[TaskScheduler] Action queue processor stopped") + logger.debug("[MessageProcessor] Send handler stopped") + + async def _process_message(self, data: Dict[str, Any]): + """处理收到的消息""" + message_type = data.get("action", "") + message_data = data.get("data", {}) + + logger.debug(f"[MessageProcessor] Processing message: {message_type}") - # 队列处理方法 - async def _process_query_status_item(self, item: QueueItem) -> bool: - """处理query_action_status类型的队列项,返回True表示需要继续,False表示可以停止""" try: - # 检查设备状态 - host_node = HostNode.get_instance(0) - if not host_node: - logger.error("[TaskScheduler] HostNode instance not available in queue processor") - return False - - action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) - free = not bool(action_jobs) - - # 发送状态报告 - if free: - # 设备空闲,发送最终状态并停止 - # 下面要增加和handle_query_state相同的逻辑 - host_node._device_action_status[item.device_action_key].job_ids[item.job_id] = time.time() - await self._publish_device_action_state( - item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", True, 0 - ) - return False # 停止继续监控 + if message_type == "pong": + self._handle_pong(message_data) + elif message_type == "query_action_state": + await self._handle_query_action_state(message_data) + elif message_type == "job_start": + await self._handle_job_start(message_data) + elif message_type == "cancel_action" or message_type == "cancel_task": + await self._handle_cancel_action(message_data) + elif message_type == "": + return else: - # 设备忙碌,发送状态并继续监控 - await self._publish_device_action_state( - item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", False, 10 - ) - return True # 继续监控 + logger.debug(f"[MessageProcessor] Unknown message type: {message_type}") except Exception as e: - logger.error(f"[TaskScheduler] Error processing query status item: {str(e)}") - return False # 出错则停止 + logger.error(f"[MessageProcessor] Error processing message {message_type}: {str(e)}") + logger.error(traceback.format_exc()) - async def _process_job_callback_item(self, item: QueueItem) -> bool: - """处理job_call_back_status类型的队列项,返回True表示需要继续,False表示可以停止""" + def _handle_pong(self, pong_data: Dict[str, Any]): + """处理pong响应""" + host_node = HostNode.get_instance(0) + if host_node: + host_node.handle_pong_response(pong_data) + + async def _handle_query_action_state(self, data: Dict[str, Any]): + """处理query_action_state消息""" + device_id = data.get("device_id", "") + action_name = data.get("action_name", "") + task_id = data.get("task_id", "") + job_id = data.get("job_id", "") + + if not all([device_id, action_name, task_id, job_id]): + logger.error("[MessageProcessor] Missing required fields in query_action_state") + return + + device_action_key = f"/devices/{device_id}/{action_name}" + + # 创建任务信息 + job_info = JobInfo( + job_id=job_id, + task_id=task_id, + device_id=device_id, + action_name=action_name, + device_action_key=device_action_key, + status=JobStatus.QUEUE, + start_time=time.time(), + ) + + # 添加到设备管理器 + can_start_immediately = self.device_manager.add_queue_request(job_info) + + # 发送响应 + if can_start_immediately: + # 可以立即开始 + await self._send_action_state_response( + device_id, action_name, task_id, job_id, "query_action_status", True, 0 + ) + logger.info(f"[MessageProcessor] Job {job_id[:4]} can start immediately") + else: + # 需要排队 + await self._send_action_state_response( + device_id, action_name, task_id, job_id, "query_action_status", False, 10 + ) + logger.info(f"[MessageProcessor] Job {job_id[:4]} queued") + + # 通知QueueProcessor有新的队列更新 + if self.queue_processor: + self.queue_processor.notify_queue_update() + + async def _handle_job_start(self, data: Dict[str, Any]): + """处理job_start消息""" try: - # 检查任务是否还在活跃列表中 - if item.job_id not in self.active_jobs: - logger.debug(f"[TaskScheduler] Job {item.job_id} no longer active") - return False + req = JobAddReq(**data) - # 检查是否收到取消信号 - if item.job_id in self.cancel_events and self.cancel_events[item.job_id].is_set(): - logger.info(f"[TaskScheduler] Job {item.job_id} cancelled via cancel event") - return False + # 开始执行任务 + success = self.device_manager.start_job(req.job_id) + if not success: + logger.error(f"[MessageProcessor] Failed to start job {req.job_id[:4]}") + return - # 检查设备状态 - host_node = HostNode.get_instance(0) - if not host_node: - logger.error( - f"[TaskScheduler] HostNode instance not available in job callback queue for job_id: {item.job_id}" - ) - return False + logger.info(f"[MessageProcessor] Starting job {req.job_id[:4]} for {req.device_id}/{req.action}") - # noinspection PyProtectedMember - action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) - free = not bool(action_jobs) - - # 发送job_call_back_status状态 - await self._publish_device_action_state( - item.device_id, item.action_name, item.task_id, item.job_id, "job_call_back_status", free, 10 + # 创建HostNode任务 + device_action_key = f"/devices/{req.device_id}/{req.action}" + queue_item = QueueItem( + task_type="job_call_back_status", + device_id=req.device_id, + action_name=req.action, + task_id=req.task_id, + job_id=req.job_id, + device_action_key=device_action_key, ) - # 如果任务完成,停止监控 - if free: - return False - else: - return True # 继续监控 + # 提交给HostNode执行 + host_node = HostNode.get_instance(0) + if not host_node: + logger.error(f"[MessageProcessor] HostNode instance not available for job_id: {req.job_id}") + return + + host_node.send_goal( + queue_item, + action_type=req.action_type, + action_kwargs=req.action_args, + server_info=req.server_info, + ) except Exception as e: - logger.error(f"[TaskScheduler] Error processing job callback item for job_id {item.job_id}: {str(e)}") - return False # 出错则停止 + logger.error(f"[MessageProcessor] Error handling job start: {str(e)}") + traceback.print_exc() - # 消息发送方法 - async def _publish_device_action_state( + # job_start出错时,需要通过正确的publish_job_status方法来处理 + if "req" in locals() and "queue_item" in locals(): + logger.info(f"[MessageProcessor] Publishing failed status for job {req.job_id[:4]}") + + if self.websocket_client: + # 使用完整的错误信息,与原版本一致 + self.websocket_client.publish_job_status( + {}, queue_item, "failed", serialize_result_info(traceback.format_exc(), False, {}) + ) + else: + # 备用方案:直接发送消息,但使用完整的错误信息 + message = { + "action": "job_status", + "data": { + "job_id": req.job_id, + "task_id": req.task_id, + "device_id": req.device_id, + "action_name": req.action, + "status": "failed", + "feedback_data": {}, + "return_info": serialize_result_info(traceback.format_exc(), False, {}), + "timestamp": time.time(), + }, + } + self.send_message(message) + + # 手动调用job结束逻辑 + next_job = self.device_manager.end_job(req.job_id) + if next_job: + # 通知下一个任务可以开始 + await self._send_action_state_response( + next_job.device_id, + next_job.action_name, + next_job.task_id, + next_job.job_id, + "query_action_status", + True, + 0, + ) + logger.info(f"[MessageProcessor] Started next job {next_job.job_id[:4]} after error") + + # 通知QueueProcessor有队列更新 + if self.queue_processor: + self.queue_processor.notify_queue_update() + else: + logger.warning("[MessageProcessor] Failed to publish job error status - missing req or queue_item") + + async def _handle_cancel_action(self, data: Dict[str, Any]): + """处理cancel_action/cancel_task消息""" + task_id = data.get("task_id") + job_id = data.get("job_id") + + logger.info(f"[MessageProcessor] Cancel request - task_id: {task_id}, job_id: {job_id}") + + if job_id: + # 按job_id取消单个job + success = self.device_manager.cancel_job(job_id) + if success: + # 通知HostNode取消 + host_node = HostNode.get_instance(0) + if host_node: + host_node.cancel_goal(job_id) + logger.info(f"[MessageProcessor] Job {job_id[:4]} cancelled") + + # 通知QueueProcessor有队列更新 + if self.queue_processor: + self.queue_processor.notify_queue_update() + else: + logger.warning(f"[MessageProcessor] Failed to cancel job {job_id[:4]}") + + elif task_id: + # 按task_id取消所有相关job + cancelled_job_ids = self.device_manager.cancel_jobs_by_task_id(task_id) + if cancelled_job_ids: + # 通知HostNode取消所有job + host_node = HostNode.get_instance(0) + if host_node: + for cancelled_job_id in cancelled_job_ids: + host_node.cancel_goal(cancelled_job_id) + + logger.info(f"[MessageProcessor] Cancelled {len(cancelled_job_ids)} jobs for task_id: {task_id}") + + # 通知QueueProcessor有队列更新 + if self.queue_processor: + self.queue_processor.notify_queue_update() + else: + logger.warning(f"[MessageProcessor] Failed to cancel any jobs for task_id: {task_id}") + else: + logger.warning("[MessageProcessor] Cancel request missing both task_id and job_id") + + async def _send_action_state_response( self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int - ) -> None: - """发布设备动作状态""" + ): + """发送动作状态响应""" message = { "action": "report_action_state", "data": { @@ -279,302 +706,301 @@ class TaskScheduler: "need_more": need_more, }, } - await self.message_sender.send_message(message) - # 业务逻辑处理方法 - async def handle_query_state(self, data: Dict[str, str]) -> None: - """处理query_action_state消息""" - device_id = data.get("device_id", "") - if not device_id: - logger.error("[TaskScheduler] query_action_state missing device_id") - return - action_name = data.get("action_name", "") - if not action_name: - logger.error("[TaskScheduler] query_action_state missing action_name") - return - task_id = data.get("task_id", "") - if not task_id: - logger.error("[TaskScheduler] query_action_state missing task_id") - return - job_id = data.get("job_id", "") - if not job_id: - logger.error("[TaskScheduler] query_action_state missing job_id") - return - - device_action_key = f"/devices/{device_id}/{action_name}" - host_node = HostNode.get_instance(0) - if not host_node: - logger.error("[TaskScheduler] HostNode instance not available") - return - - action_jobs = len(host_node._device_action_status[device_action_key].job_ids) - free = not bool(action_jobs) - - # 如果设备空闲,立即响应free状态 - if free: - await self._publish_device_action_state( - device_id, action_name, task_id, job_id, "query_action_status", True, 0 - ) - logger.debug(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately") - host_node = HostNode.get_instance(0) - if not host_node: - logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}") - return - host_node._device_action_status[device_action_key].job_ids[job_id] = time.time() - return - - # 设备忙碌时,检查是否已有相同的轮询任务 - if self.action_queue is not None: - with self.action_queue_lock: - # 检查是否已存在相同job_id和task_id的轮询任务 - for existing_item in self.action_queue: - if ( - existing_item.task_type == "query_action_status" - and existing_item.job_id == job_id - and existing_item.task_id == task_id - and existing_item.device_action_key == device_action_key - ): - logger.error( - f"[TaskScheduler] Duplicate query_action_state ignored: " - f"job_id={job_id}, task_id={task_id}, server error" - ) - return - - # 没有重复,加入轮询队列 - queue_item = QueueItem( - task_type="query_action_status", - device_id=device_id, - action_name=action_name, - task_id=task_id, - job_id=job_id, - device_action_key=device_action_key, - next_run_time=time.time() + 10, # 10秒后执行 - ) - self.action_queue.append(queue_item) - logger.debug( - f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, " - f"added to polling queue {action_jobs}" - ) - - # 立即发送busy状态 - await self._publish_device_action_state( - device_id, action_name, task_id, job_id, "query_action_status", False, 10 - ) - else: - logger.warning("[TaskScheduler] Action queue not available") - - async def handle_job_start(self, data: Dict[str, Any]): - """处理作业启动消息""" try: - req = JobAddReq(**data) - device_action_key = f"/devices/{req.device_id}/{req.action}" + self.send_queue.put_nowait(message) + except Exception: + logger.warning("[MessageProcessor] Send queue full, dropping message") - logger.info( - f"[TaskScheduler] Starting job with job_id: {req.job_id}, " - f"device: {req.device_id}, action: {req.action}" - ) + def send_message(self, message: Dict[str, Any]) -> bool: + """发送消息到队列""" + try: + self.send_queue.put_nowait(message) + return True + except Exception: + logger.warning(f"[MessageProcessor] Failed to queue message: {message.get('action', 'unknown')}") + return False - # 添加到活跃任务 - self.active_jobs[req.job_id] = { - "device_id": req.device_id, - "action_name": req.action, - "task_id": data.get("task_id", ""), - "start_time": time.time(), - "device_action_key": device_action_key, - "callback_started": False, # 标记callback是否已启动 - } + def is_connected(self) -> bool: + """检查连接状态""" + return self.connected - # 创建取消事件,todo:要移动到query_state中 - self.cancel_events[req.job_id] = asyncio.Event() - # 启动callback定时发送 - await self._start_job_callback(req.job_id, req.device_id, req.action, req.task_id, device_action_key) - # 创建兼容HostNode的QueueItem对象 - job_queue_item = QueueItem( - task_type="job_call_back_status", - device_id=req.device_id, - action_name=req.action, - task_id=req.task_id, - job_id=req.job_id, - device_action_key=device_action_key, - next_run_time=time.time(), - ) +class QueueProcessor: + """队列处理线程 - 定时给发送队列推送消息,管理任务状态""" + + def __init__(self, device_manager: DeviceActionManager, message_processor: MessageProcessor): + self.device_manager = device_manager + self.message_processor = message_processor + + # 线程控制 + self.is_running = False + self.thread = None + + # 事件通知机制 + self.queue_update_event = threading.Event() + + logger.info("[QueueProcessor] Initialized") + + def start(self) -> None: + """启动队列处理线程""" + if self.is_running: + logger.warning("[QueueProcessor] Already running") + return + + self.is_running = True + self.thread = threading.Thread(target=self._run, daemon=True, name="QueueProcessor") + self.thread.start() + logger.info("[QueueProcessor] Started") + + def stop(self) -> None: + """停止队列处理线程""" + self.is_running = False + if self.thread and self.thread.is_alive(): + self.thread.join(timeout=5) + logger.info("[QueueProcessor] Stopped") + + def _run(self): + """运行队列处理主循环""" + logger.debug("[QueueProcessor] Queue processor started") + + while self.is_running: try: + # 发送正在执行任务的running状态 + self._send_running_status() + + # 发送排队任务的busy状态 + self._send_busy_status() + + # 等待10秒或者等待事件通知 + self.queue_update_event.wait(timeout=10) + self.queue_update_event.clear() # 清除事件 - host_node = HostNode.get_instance(0) - if not host_node: - logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}") - return - host_node.send_goal( - job_queue_item, - action_type=req.action_type, - action_kwargs=req.action_args, - server_info=req.server_info, - ) except Exception as e: - logger.error(f"[TaskScheduler] Exception during job start for job_id {req.job_id}: {str(e)}") - traceback.print_exc() - self.publish_job_status( - {}, job_queue_item, "failed", serialize_result_info(traceback.format_exc(), False, {}) - ) - except Exception as e: - logger.error(f"[TaskScheduler] Error handling job start: {str(e)}") + logger.error(f"[QueueProcessor] Error in queue processor: {str(e)}") + logger.error(traceback.format_exc()) + time.sleep(1) - async def handle_cancel_action(self, data: Dict[str, Any]) -> None: - """处理取消动作请求""" - task_id = data.get("task_id") - job_id = data.get("job_id") + logger.debug("[QueueProcessor] Queue processor stopped") - logger.debug(f"[TaskScheduler] Handling cancel action request - task_id: {task_id}, job_id: {job_id}") + def notify_queue_update(self): + """通知队列有更新,触发立即检查""" + self.queue_update_event.set() - if not task_id and not job_id: - logger.error("[TaskScheduler] cancel_action missing both task_id and job_id") + def _send_running_status(self): + """发送正在执行任务的running状态""" + if not self.message_processor.is_connected(): return - # 通过job_id取消 - if job_id: - logger.info(f"[TaskScheduler] Cancelling job by job_id: {job_id}") - # 设置取消事件 - if job_id in self.cancel_events: - self.cancel_events[job_id].set() - logger.debug(f"[TaskScheduler] Set cancel event for job_id: {job_id}") + active_jobs = self.device_manager.get_active_jobs() + for job_info in active_jobs: + # 只给真正在执行的job发送running状态,READY状态的job不需要 + if job_info.status != JobStatus.STARTED: + continue + + message = { + "action": "report_action_state", + "data": { + "type": "job_call_back_status", + "device_id": job_info.device_id, + "action_name": job_info.action_name, + "task_id": job_info.task_id, + "job_id": job_info.job_id, + "free": False, + "need_more": 10, + }, + } + self.message_processor.send_message(message) + logger.trace(f"[QueueProcessor] Sent running status for job {job_info.job_id[:4]}") # type: ignore + + def _send_busy_status(self): + """发送排队任务的busy状态""" + if not self.message_processor.is_connected(): + return + + queued_jobs = self.device_manager.get_queued_jobs() + if not queued_jobs: + return + + logger.debug(f"[QueueProcessor] Sending busy status for {len(queued_jobs)} queued jobs") + + for job_info in queued_jobs: + message = { + "action": "report_action_state", + "data": { + "type": "query_action_status", + "device_id": job_info.device_id, + "action_name": job_info.action_name, + "task_id": job_info.task_id, + "job_id": job_info.job_id, + "free": False, + "need_more": 10, + }, + } + success = self.message_processor.send_message(message) + if success: + logger.debug(f"[QueueProcessor] Sent busy/need_more for queued job {job_info.job_id[:4]}") else: - logger.warning(f"[TaskScheduler] Cancel event not found for job_id: {job_id}") + logger.warning(f"[QueueProcessor] Failed to send busy status for job {job_info.job_id[:4]}") - # 停止job callback并发送取消状态 - if job_id in self.active_jobs: - logger.debug(f"[TaskScheduler] Found active job for cancellation: {job_id}") - # 调用HostNode的cancel_goal - host_node = HostNode.get_instance(0) - if host_node: - host_node.cancel_goal(job_id) - logger.info(f"[TaskScheduler] Cancelled goal in HostNode for job_id: {job_id}") - else: - logger.error(f"[TaskScheduler] HostNode not available for cancel goal: {job_id}") + def handle_job_completed(self, job_id: str, status: str) -> None: + """处理任务完成""" + logger.info(f"[QueueProcessor] Job {job_id[:4]} completed with status: {status}") - # 停止callback并发送取消状态 - await self._stop_job_callback(job_id, "cancelled") - logger.info(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}") - else: - logger.warning(f"[TaskScheduler] Job not found in active jobs for cancellation: {job_id}") + # 结束任务,获取下一个可执行的任务 + next_job = self.device_manager.end_job(job_id) - # 通过task_id取消(需要查找对应的job_id) - if task_id and not job_id: - logger.debug(f"[TaskScheduler] Cancelling jobs by task_id: {task_id}") - jobs_to_cancel = [] - for jid, job_info in self.active_jobs.items(): - if job_info.get("task_id") == task_id: - jobs_to_cancel.append(jid) + if next_job and self.message_processor.is_connected(): + # 通知下一个任务可以开始 + message = { + "action": "report_action_state", + "data": { + "type": "query_action_status", + "device_id": next_job.device_id, + "action_name": next_job.action_name, + "task_id": next_job.task_id, + "job_id": next_job.job_id, + "free": True, + "need_more": 0, + }, + } + self.message_processor.send_message(message) + logger.info(f"[QueueProcessor] Notified next job {next_job.job_id[:4]} can start") - logger.debug( - f"[TaskScheduler] Found {len(jobs_to_cancel)} jobs to cancel for task_id {task_id}: {jobs_to_cancel}" - ) + # 立即触发下一轮状态检查 + self.notify_queue_update() - for jid in jobs_to_cancel: - logger.debug(f"[TaskScheduler] Recursively cancelling job_id: {jid} for task_id: {task_id}") - # 递归调用自身来取消每个job - await self.handle_cancel_action({"job_id": jid}) - logger.debug(f"[TaskScheduler] Completed cancel action handling - task_id: {task_id}, job_id: {job_id}") +class WebSocketClient(BaseCommunicationClient): + """ + 重构后的WebSocket客户端 v2 - # job管理方法 - async def _start_job_callback( - self, job_id: str, device_id: str, action_name: str, task_id: str, device_action_key: str - ) -> None: - """启动job的callback定时发送""" - if job_id not in self.active_jobs: - logger.debug(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") - return + 采用两线程架构: + - 消息处理线程:处理WebSocket消息,划分任务执行和任务队列 + - 队列处理线程:定时给发送队列推送消息,管理任务状态 + """ - # 检查是否已经启动过callback - if self.active_jobs[job_id].get("callback_started", False): - logger.warning(f"[TaskScheduler] Job callback already started for job_id: {job_id}") - return + def __init__(self): + super().__init__() + self.is_disabled = False + self.client_id = f"{uuid.uuid4()}" - # 标记callback已启动 - self.active_jobs[job_id]["callback_started"] = True + # 核心组件 + self.device_manager = DeviceActionManager() + self.send_queue = Queue(maxsize=1000) - # 将job_call_back_status任务放入队列 - queue_item = QueueItem( - task_type="job_call_back_status", - device_id=device_id, - action_name=action_name, - task_id=task_id, - job_id=job_id, - device_action_key=device_action_key, - next_run_time=time.time() + 10, # 10秒后开始报送 - ) - if self.action_queue is not None: - with self.action_queue_lock: - self.action_queue.append(queue_item) + # 构建WebSocket URL + self.websocket_url = self._build_websocket_url() + + # 两个核心线程 + self.message_processor = MessageProcessor(self.websocket_url, self.send_queue, self.device_manager) + self.queue_processor = QueueProcessor(self.device_manager, self.message_processor) + + # 设置相互引用 + self.message_processor.set_queue_processor(self.queue_processor) + self.message_processor.set_websocket_client(self) + + logger.info(f"[WebSocketClient] Client_id: {self.client_id}") + + def _build_websocket_url(self) -> Optional[str]: + """构建WebSocket连接URL""" + if not HTTPConfig.remote_addr: + return None + + parsed = urlparse(HTTPConfig.remote_addr) + + if parsed.scheme == "https": + scheme = "wss" else: - logger.debug(f"[TaskScheduler] Action queue not available for job callback: {job_id}") + scheme = "ws" - async def _stop_job_callback(self, job_id: str, final_status: str) -> None: - """停止job的callback定时发送并发送最终结果""" - logger.info(f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}") - if job_id not in self.active_jobs: - logger.debug(f"[TaskScheduler] Job {job_id} not found in active jobs when stopping callback") + if ":" in parsed.netloc and parsed.port is not None: + url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/schedule" + else: + url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule" + + logger.debug(f"[WebSocketClient] URL: {url}") + return url + + def start(self) -> None: + """启动WebSocket客户端""" + if self.is_disabled: + logger.warning("[WebSocketClient] WebSocket is disabled, skipping connection.") return - job_info = self.active_jobs[job_id] - device_id = job_info["device_id"] - action_name = job_info["action_name"] - task_id = job_info["task_id"] - device_action_key = job_info["device_action_key"] + if not self.websocket_url: + logger.error("[WebSocketClient] WebSocket URL not configured") + return - logger.debug( - f"[TaskScheduler] Job {job_id} details - device: {device_id}, action: {action_name}, task: {task_id}" - ) + logger.info(f"[WebSocketClient] Starting connection to {self.websocket_url}") - # 移除活跃任务和取消事件(这会让队列处理器自动停止callback) - self.active_jobs.pop(job_id, None) - self.cancel_events.pop(job_id, None) - logger.debug(f"[TaskScheduler] Removed job {job_id} from active jobs and cancel events") + # 启动两个核心线程 + self.message_processor.start() + self.queue_processor.start() - # 发送最终的callback状态 - await self._publish_device_action_state( - device_id, action_name, task_id, job_id, "job_call_back_status", True, 0 - ) - logger.debug(f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}") + logger.info("[WebSocketClient] All threads started") + + def stop(self) -> None: + """停止WebSocket客户端""" + if self.is_disabled: + return + + logger.info("[WebSocketClient] Stopping connection") + + # 停止两个核心线程 + self.message_processor.stop() + self.queue_processor.stop() + + logger.info("[WebSocketClient] All threads stopped") + + # BaseCommunicationClient接口实现 + def is_connected(self) -> bool: + """检查是否已连接""" + return self.message_processor.is_connected() and not self.is_disabled + + def publish_device_status(self, device_status: dict, device_id: str, property_name: str) -> None: + """发布设备状态""" + if self.is_disabled or not self.is_connected(): + return + + message = { + "action": "device_status", + "data": { + "device_id": device_id, + "data": { + "property_name": property_name, + "status": device_status.get(device_id, {}).get(property_name), + "timestamp": time.time(), + }, + }, + } + self.message_processor.send_message(message) + logger.debug(f"[WebSocketClient] Device status published: {device_id}.{property_name}") - # 外部接口方法 def publish_job_status( - self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[dict] = None + self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[dict] = None ) -> None: """发布作业状态,拦截最终结果(给HostNode调用的接口)""" - if not self.message_sender.is_connected(): - logger.debug(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") + if not self.is_connected(): + logger.debug(f"[WebSocketClient] Not connected, cannot publish job status for job_id: {item.job_id}") return - # 拦截最终结果状态 + # 拦截最终结果状态,与原版本逻辑一致 if status in ["success", "failed"]: host_node = HostNode.get_instance(0) if host_node: - host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id) - logger.info(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") - # 给其他同名action至少执行一次的机会 - with self.immediate_execution_lock: - self.immediate_execution_flags[item.device_action_key] = time.time() + 3 + # 从HostNode的device_action_status中移除job_id + try: + host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id, None) + except (KeyError, AttributeError): + logger.warning(f"[WebSocketClient] Failed to remove job {item.job_id} from HostNode status") - # 检查是否在同一个事件循环中 - try: - current_loop = asyncio.get_running_loop() - if current_loop == self.message_sender.event_loop: - # 在同一个事件循环中,直接创建任务 - asyncio.create_task(self._stop_job_callback(item.job_id, status)) - else: - # 不在同一个事件循环中,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe( - self._stop_job_callback(item.job_id, status), self.message_sender.event_loop - ) - except RuntimeError: - # 没有运行中的事件循环,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe( - self._stop_job_callback(item.job_id, status), self.message_sender.event_loop - ) + logger.info(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}") - # 执行结果信息上传 + # 通知队列处理器job完成 + self.queue_processor.handle_job_completed(item.job_id, status) + + # 发送job状态消息 message = { "action": "job_status", "data": { @@ -588,437 +1014,25 @@ class TaskScheduler: "timestamp": time.time(), }, } + self.message_processor.send_message(message) - # 同样检查事件循环 - try: - current_loop = asyncio.get_running_loop() - if current_loop == self.message_sender.event_loop: - # 在同一个事件循环中,直接创建任务 - asyncio.create_task(self.message_sender.send_message(message)) - else: - # 不在同一个事件循环中,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe( - self.message_sender.send_message(message), self.message_sender.event_loop - ) - except RuntimeError: - # 没有运行中的事件循环,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe(self.message_sender.send_message(message), self.message_sender.event_loop) - - logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore - - def cancel_goal(self, job_id: str) -> None: - """取消指定的任务(给外部调用的接口)""" - logger.debug(f"[TaskScheduler] External cancel request for job_id: {job_id}") - if job_id in self.cancel_events: - logger.debug(f"[TaskScheduler] Found cancel event for job_id: {job_id}, processing cancellation") - try: - loop = asyncio.get_event_loop() - loop.create_task(self.handle_cancel_action({"job_id": job_id})) - logger.debug(f"[TaskScheduler] Scheduled cancel action for job_id: {job_id}") - except RuntimeError: - asyncio.run(self.handle_cancel_action({"job_id": job_id})) - logger.debug(f"[TaskScheduler] Executed cancel action for job_id: {job_id}") - logger.debug(f"[TaskScheduler] Initiated cancel for job_id: {job_id}") - else: - logger.debug(f"[TaskScheduler] Job {job_id} not found in cancel events for cancellation") - - -class WebSocketClient(BaseCommunicationClient): - """ - WebSocket通信客户端类 - - 专注于WebSocket连接管理和消息传输。 - """ - - def __init__(self): - super().__init__() - self.is_disabled = False - self.client_id = f"{uuid.uuid4()}" - - # WebSocket连接相关 - self.websocket = None - self.connection_loop = None - self.event_loop: asyncio.AbstractEventLoop = None # type: ignore - self.connection_thread = None - self.is_running = False - self.connected = False - - # 消息处理 - self.reconnect_count = 0 - - # 消息发送队列和处理器 - self.send_queue = None # 延迟初始化 - self.send_queue_task = None # 发送队列处理任务 - - # 任务调度器 - self.task_scheduler = None - - # 构建WebSocket URL - self._build_websocket_url() - - logger.info(f"[WebSocket] Client_id: {self.client_id}") - - # 初始化方法 - def _initialize_task_scheduler(self): - """初始化任务调度器""" - if not self.task_scheduler: - self.task_scheduler = TaskScheduler(self) - self.task_scheduler.start() - logger.info("[WebSocket] Task scheduler initialized") - - def _build_websocket_url(self): - """构建WebSocket连接URL""" - if not HTTPConfig.remote_addr: - self.websocket_url = None - return - - # 解析服务器URL - parsed = urlparse(HTTPConfig.remote_addr) - - # 根据SSL配置选择协议 - if parsed.scheme == "https": - scheme = "wss" - else: - scheme = "ws" - if ":" in parsed.netloc and parsed.port is not None: - self.websocket_url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/schedule" - else: - self.websocket_url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule" - logger.debug(f"[WebSocket] URL: {self.websocket_url}") - - # 连接管理方法 - def start(self) -> None: - """启动WebSocket连接和任务调度器""" - if self.is_disabled: - logger.warning("[WebSocket] WebSocket is disabled, skipping connection.") - return - - if not self.websocket_url: - logger.error("[WebSocket] WebSocket URL not configured") - return - - logger.info(f"[WebSocket] Starting connection to {self.websocket_url}") - - # 初始化任务调度器 - self._initialize_task_scheduler() - - self.is_running = True - - # 在单独线程中运行WebSocket连接 - self.connection_thread = threading.Thread(target=self._run_connection, daemon=True, name="WebSocketConnection") - self.connection_thread.start() - - def stop(self) -> None: - """停止WebSocket连接和任务调度器""" - if self.is_disabled: - return - - logger.info("[WebSocket] Stopping connection") - self.is_running = False - self.connected = False - - # 停止任务调度器 - if self.task_scheduler: - self.task_scheduler.stop() - - if self.event_loop and self.event_loop.is_running(): - asyncio.run_coroutine_threadsafe(self._close_connection(), self.event_loop) - - if self.connection_thread and self.connection_thread.is_alive(): - self.connection_thread.join(timeout=5) - - def _run_connection(self): - """在独立线程中运行WebSocket连接""" - try: - # 创建新的事件循环 - self.event_loop = asyncio.new_event_loop() - asyncio.set_event_loop(self.event_loop) - - # 在正确的事件循环中创建发送队列 - self.send_queue = asyncio.Queue(maxsize=1000) # 限制队列大小防止内存溢出 - - # 运行连接逻辑 - self.event_loop.run_until_complete(self._connection_handler()) - except Exception as e: - logger.error(f"[WebSocket] Connection thread error: {str(e)}") - logger.error(traceback.format_exc()) - finally: - if self.event_loop: - self.event_loop.close() - - async def _connection_handler(self): - """处理WebSocket连接和重连逻辑""" - while self.is_running: - try: - # 构建SSL上下文 - ssl_context = None - assert self.websocket_url is not None - if self.websocket_url.startswith("wss://"): - ssl_context = ssl_module.create_default_context() - ws_logger = logging.getLogger("websockets.client") - ws_logger.setLevel(logging.INFO) - async with websockets.connect( - self.websocket_url, - ssl=ssl_context, - ping_interval=WSConfig.ping_interval, - ping_timeout=10, - additional_headers={"Authorization": f"Lab {BasicConfig.auth_secret()}"}, - logger=ws_logger, - ) as websocket: - self.websocket = websocket - self.connected = True - self.reconnect_count = 0 - - logger.info(f"[WebSocket] Connected to {self.websocket_url}") - - # 启动发送队列处理器 - self.send_queue_task = asyncio.create_task(self._send_queue_processor()) - - try: - # 处理消息 - await self._message_handler() - finally: - # 停止发送队列处理器 - if self.send_queue_task and not self.send_queue_task.done(): - self.send_queue_task.cancel() - try: - await self.send_queue_task - except asyncio.CancelledError: - pass - - except websockets.exceptions.ConnectionClosed: - logger.warning("[WebSocket] Connection closed") - self.connected = False - except Exception as e: - logger.error(f"[WebSocket] Connection error: {str(e)}") - self.connected = False - finally: - # WebSocket连接结束时只需重置websocket对象 - self.websocket = None - - # 重连逻辑 - if self.is_running and self.reconnect_count < WSConfig.max_reconnect_attempts: - self.reconnect_count += 1 - logger.info( - f"[WebSocket] Reconnecting in {WSConfig.reconnect_interval}s " - f"(attempt {self.reconnect_count}/{WSConfig.max_reconnect_attempts})" - ) - await asyncio.sleep(WSConfig.reconnect_interval) - elif self.reconnect_count >= WSConfig.max_reconnect_attempts: - logger.error("[WebSocket] Max reconnection attempts reached") - break - else: - self.reconnect_count -= 1 - - async def _close_connection(self): - """关闭WebSocket连接""" - if self.websocket: - await self.websocket.close() - self.websocket = None - - async def _send_queue_processor(self): - """处理发送队列中的消息""" - logger.debug("[WebSocket] Send queue processor started") - if not self.send_queue: - logger.error("[WebSocket] Send queue not initialized") - return - - try: - while self.connected and self.websocket: - try: - # 使用超时避免无限等待 - message = await asyncio.wait_for(self.send_queue.get(), timeout=1.0) - - # 批量处理:收集短时间内的多个消息 - messages_to_send = [message] - batch_size = 0 - max_batch_size = 10 # 最大批处理数量 - - # 尝试获取更多消息(非阻塞) - while batch_size < max_batch_size and not self.send_queue.empty(): - try: - additional_msg = self.send_queue.get_nowait() - messages_to_send.append(additional_msg) - batch_size += 1 - except asyncio.QueueEmpty: - break - - # 发送消息 - for msg in messages_to_send: - if self.websocket and self.connected: - try: - message_str = json.dumps(msg, ensure_ascii=False) - await self.websocket.send(message_str) - logger.trace( # type: ignore - f"[WebSocket] Message sent: {msg.get('action', 'unknown')}" - ) - except Exception as e: - logger.error(f"[WebSocket] Failed to send message: {str(e)}") - # 如果发送失败,将消息重新放回队列(可选) - # await self.send_queue.put(msg) - break - - # 在批量发送之间添加小延迟,避免过载 - if batch_size > 5: - await asyncio.sleep(0.001) - - except asyncio.TimeoutError: - # 超时是正常的,继续循环 - continue - except asyncio.CancelledError: - logger.debug("[WebSocket] Send queue processor cancelled") - break - except Exception as e: - logger.error(f"[WebSocket] Error in send queue processor: {str(e)}") - await asyncio.sleep(0.1) - - except Exception as e: - logger.error(f"[WebSocket] Fatal error in send queue processor: {str(e)}") - finally: - logger.debug("[WebSocket] Send queue processor stopped") - - # 消息处理方法 - async def _message_handler(self): - """处理接收到的消息""" - if not self.websocket: - logger.error("[WebSocket] WebSocket connection is None") - return - - try: - async for message in self.websocket: - try: - data = json.loads(message) - await self._process_message(data) - except json.JSONDecodeError: - logger.error(f"[WebSocket] Invalid JSON received: {message}") - except Exception as e: - logger.error(f"[WebSocket] Error processing message: {str(e)}") - except websockets.exceptions.ConnectionClosed: - logger.info("[WebSocket] Message handler stopped - connection closed") - except Exception as e: - logger.error(f"[WebSocket] Message handler error: {str(e)}") - - async def _process_message(self, input_message: Dict[str, Any]): - """处理收到的消息""" - message_type = input_message.get("action", "") - data = input_message.get("data", {}) - - if message_type == "pong": - # 处理pong响应(WebSocket层面的连接管理) - self._handle_pong_sync(data) - elif self.task_scheduler: - # 其他消息交给TaskScheduler处理 - if message_type == "job_start": - await self.task_scheduler.handle_job_start(data) - elif message_type == "query_action_state": - await self.task_scheduler.handle_query_state(data) - elif message_type == "cancel_action": - await self.task_scheduler.handle_cancel_action(data) - elif message_type == "": - return - else: - logger.debug(f"[WebSocket] Unknown message: {input_message}") - else: - logger.warning(f"[WebSocket] Task scheduler not available for message: {message_type}") - - def _handle_pong_sync(self, pong_data: Dict[str, Any]): - """同步处理pong响应""" - host_node = HostNode.get_instance(0) - if host_node: - host_node.handle_pong_response(pong_data) - - # MessageSender接口实现 - async def send_message(self, message: Dict[str, Any]) -> None: - """内部发送消息方法,将消息放入发送队列""" - if not self.connected: - logger.warning("[WebSocket] Not connected, cannot send message") - return - - # 检查发送队列是否已初始化 - if not self.send_queue: - logger.warning("[WebSocket] Send queue not initialized, cannot send message") - return - - try: - # 尝试将消息放入队列(非阻塞) - self.send_queue.put_nowait(message) - logger.trace(f"[WebSocket] Message queued: {message['action']}") # type: ignore - except asyncio.QueueFull: - logger.error(f"[WebSocket] Send queue full, dropping message: {message['action']}") - # 可选:在队列满时采取其他策略,如等待或丢弃旧消息 - - def is_connected(self) -> bool: - """检查是否已连接(TaskScheduler调用的接口)""" - return self.connected and not self.is_disabled - - # 基类方法实现 - def publish_device_status(self, device_status: dict, device_id: str, property_name: str) -> None: - """发布设备状态""" - if self.is_disabled or not self.connected: - return - message = { - "action": "device_status", - "data": { - "device_id": device_id, - "data": { - "property_name": property_name, - "status": device_status.get(device_id, {}).get(property_name), - "timestamp": time.time(), - }, - }, - } - - # 检查是否在同一个事件循环中 - try: - current_loop = asyncio.get_running_loop() - if current_loop == self.event_loop: - # 在同一个事件循环中,直接创建任务 - asyncio.create_task(self.send_message(message)) - else: - # 不在同一个事件循环中,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) - except RuntimeError: - # 没有运行中的事件循环,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) - - logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") - - def publish_job_status( - self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[dict] = None - ) -> None: - """发布作业状态(转发给TaskScheduler)""" - if self.task_scheduler: - self.task_scheduler.publish_job_status(feedback_data, item, status, return_info) - else: - logger.debug(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") + logger.debug(f"[WebSocketClient] Job status published: {item.job_id[:4]} - {status}") def send_ping(self, ping_id: str, timestamp: float) -> None: """发送ping消息""" - if self.is_disabled or not self.connected: - logger.warning("[WebSocket] Not connected, cannot send ping") + if self.is_disabled or not self.is_connected(): + logger.warning("[WebSocketClient] Not connected, cannot send ping") return + message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} - - # 检查是否在同一个事件循环中 - try: - current_loop = asyncio.get_running_loop() - if current_loop == self.event_loop: - # 在同一个事件循环中,直接创建任务 - asyncio.create_task(self.send_message(message)) - else: - # 不在同一个事件循环中,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) - except RuntimeError: - # 没有运行中的事件循环,使用 run_coroutine_threadsafe - asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop) - - logger.debug(f"[WebSocket] Ping sent: {ping_id}") + self.message_processor.send_message(message) + logger.debug(f"[WebSocketClient] Ping sent: {ping_id}") def cancel_goal(self, job_id: str) -> None: - """取消指定的任务(转发给TaskScheduler)""" - logger.debug(f"[WebSocket] Received cancel goal request for job_id: {job_id}") - if self.task_scheduler: - self.task_scheduler.cancel_goal(job_id) - logger.debug(f"[WebSocket] Forwarded cancel goal to TaskScheduler for job_id: {job_id}") + """取消指定的任务""" + logger.debug(f"[WebSocketClient] Cancel goal request for job_id: {job_id[:4]}") + success = self.device_manager.cancel_job(job_id) + if success: + logger.info(f"[WebSocketClient] Job {job_id[:4]} cancelled successfully") else: - logger.debug(f"[WebSocket] Task scheduler not available for cancel goal: {job_id}") + logger.warning(f"[WebSocketClient] Failed to cancel job {job_id[:4]}")