add server timeout

This commit is contained in:
Xuwznln
2025-09-16 09:47:06 +08:00
parent 779c9693d9
commit 4888f02c09

View File

@@ -30,6 +30,13 @@ from unilabos.config.config import WSConfig, HTTPConfig, BasicConfig
from unilabos.utils import logger from unilabos.utils import logger
def format_job_log(job_id: str, task_id: str = "", device_id: str = "", action_name: str = "") -> str:
"""格式化job日志信息jobid[:4]-taskid[:4] device_id/action_name"""
job_part = f"{job_id[:4]}-{task_id[:4]}" if task_id else job_id[:4]
device_part = f"{device_id}/{action_name}" if device_id and action_name else ""
return f"{job_part} {device_part}".strip()
class JobStatus(Enum): class JobStatus(Enum):
"""任务状态枚举""" """任务状态枚举"""
@@ -65,11 +72,20 @@ class JobInfo:
status: JobStatus status: JobStatus
start_time: float start_time: float
last_update_time: float = field(default_factory=time.time) last_update_time: float = field(default_factory=time.time)
ready_timeout: Optional[float] = None # READY状态的超时时间
def update_timestamp(self): def update_timestamp(self):
"""更新最后更新时间""" """更新最后更新时间"""
self.last_update_time = time.time() self.last_update_time = time.time()
def set_ready_timeout(self, timeout_seconds: int = 10):
"""设置READY状态超时时间"""
self.ready_timeout = time.time() + timeout_seconds
def is_ready_timeout(self) -> bool:
"""检查READY状态是否超时"""
return self.status == JobStatus.READY and self.ready_timeout is not None and time.time() > self.ready_timeout
@dataclass @dataclass
class WebSocketMessage: class WebSocketMessage:
@@ -107,7 +123,8 @@ class DeviceActionManager:
self.device_queues[device_key] = [] self.device_queues[device_key] = []
job_info.status = JobStatus.QUEUE job_info.status = JobStatus.QUEUE
self.device_queues[device_key].append(job_info) self.device_queues[device_key].append(job_info)
logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} queued for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} queued for {device_key}")
return False return False
# 检查是否有排队的任务 # 检查是否有排队的任务
@@ -115,15 +132,18 @@ class DeviceActionManager:
# 有排队的任务,加入队列末尾 # 有排队的任务,加入队列末尾
job_info.status = JobStatus.QUEUE job_info.status = JobStatus.QUEUE
self.device_queues[device_key].append(job_info) self.device_queues[device_key].append(job_info)
logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} queued for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} queued for {device_key}")
return False return False
# 没有正在执行或排队的任务,可以立即执行 # 没有正在执行或排队的任务,可以立即执行
# 将其状态设为READY并占位防止后续job也被判断为free # 将其状态设为READY并占位防止后续job也被判断为free
job_info.status = JobStatus.READY job_info.status = JobStatus.READY
job_info.update_timestamp() job_info.update_timestamp()
job_info.set_ready_timeout(10) # 设置10秒超时
self.active_jobs[device_key] = job_info self.active_jobs[device_key] = job_info
logger.info(f"[DeviceActionManager] Job {job_info.job_id[:4]} can start immediately for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} can start immediately for {device_key}")
return True return True
def start_job(self, job_id: str) -> bool: def start_job(self, job_id: str) -> bool:
@@ -141,21 +161,23 @@ class DeviceActionManager:
# 检查job的状态是否正确 # 检查job的状态是否正确
if job_info.status != JobStatus.READY: if job_info.status != JobStatus.READY:
logger.error( job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
f"[DeviceActionManager] Job {job_id[:4]} is not in READY status, current: {job_info.status}" logger.error(f"[DeviceActionManager] Job {job_log} is not in READY status, current: {job_info.status}")
)
return False return False
# 检查设备上是否是这个job # 检查设备上是否是这个job
if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id: if device_key not in self.active_jobs or self.active_jobs[device_key].job_id != job_id:
logger.error(f"[DeviceActionManager] Job {job_id[:4]} is not the active job for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.error(f"[DeviceActionManager] Job {job_log} is not the active job for {device_key}")
return False return False
# 开始执行任务将状态从READY转换为STARTED # 开始执行任务将状态从READY转换为STARTED
job_info.status = JobStatus.STARTED job_info.status = JobStatus.STARTED
job_info.update_timestamp() job_info.update_timestamp()
job_info.ready_timeout = None # 清除超时时间
logger.info(f"[DeviceActionManager] Job {job_id[:4]} started for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} started for {device_key}")
return True return True
def end_job(self, job_id: str) -> Optional[JobInfo]: def end_job(self, job_id: str) -> Optional[JobInfo]:
@@ -177,9 +199,11 @@ class DeviceActionManager:
job_info.update_timestamp() job_info.update_timestamp()
# 从all_jobs中移除已结束的job # 从all_jobs中移除已结束的job
del self.all_jobs[job_id] del self.all_jobs[job_id]
logger.info(f"[DeviceActionManager] Job {job_id[:4]} ended for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.info(f"[DeviceActionManager] Job {job_log} ended for {device_key}")
else: else:
logger.warning(f"[DeviceActionManager] Job {job_id[:4]} was not active for {device_key}") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.warning(f"[DeviceActionManager] Job {job_log} was not active for {device_key}")
# 检查队列中是否有等待的任务 # 检查队列中是否有等待的任务
if device_key in self.device_queues and self.device_queues[device_key]: if device_key in self.device_queues and self.device_queues[device_key]:
@@ -187,8 +211,12 @@ class DeviceActionManager:
# 将下一个job设置为READY状态并放入active_jobs # 将下一个job设置为READY状态并放入active_jobs
next_job.status = JobStatus.READY next_job.status = JobStatus.READY
next_job.update_timestamp() next_job.update_timestamp()
next_job.set_ready_timeout(10) # 设置10秒超时
self.active_jobs[device_key] = next_job self.active_jobs[device_key] = next_job
logger.info(f"[DeviceActionManager] Next job {next_job.job_id[:4]} can start for {device_key}") next_job_log = format_job_log(
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
)
logger.info(f"[DeviceActionManager] Next job {next_job_log} can start for {device_key}")
return next_job return next_job
return None return None
@@ -229,7 +257,8 @@ class DeviceActionManager:
# job_info.status = JobStatus.ENDED # job_info.status = JobStatus.ENDED
# # 从all_jobs中移除 # # 从all_jobs中移除
# del self.all_jobs[job_id] # del self.all_jobs[job_id]
# logger.info(f"[DeviceActionManager] Active job {job_id[:4]} cancelled for {device_key}") # job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
# logger.info(f"[DeviceActionManager] Active job {job_log} cancelled for {device_key}")
# # 启动下一个任务 # # 启动下一个任务
# if device_key in self.device_queues and self.device_queues[device_key]: # if device_key in self.device_queues and self.device_queues[device_key]:
@@ -237,8 +266,11 @@ class DeviceActionManager:
# # 将下一个job设置为READY状态并放入active_jobs # # 将下一个job设置为READY状态并放入active_jobs
# next_job.status = JobStatus.READY # next_job.status = JobStatus.READY
# next_job.update_timestamp() # next_job.update_timestamp()
# next_job.set_ready_timeout(10)
# self.active_jobs[device_key] = next_job # self.active_jobs[device_key] = next_job
# logger.info(f"[DeviceActionManager] Next job {next_job.job_id[:4]} can start after cancel") # next_job_log = format_job_log(next_job.job_id, next_job.task_id,
# next_job.device_id, next_job.action_name)
# logger.info(f"[DeviceActionManager] Next job {next_job_log} can start after cancel")
# return True # return True
pass pass
@@ -250,10 +282,14 @@ class DeviceActionManager:
job_info.status = JobStatus.ENDED job_info.status = JobStatus.ENDED
# 从all_jobs中移除 # 从all_jobs中移除
del self.all_jobs[job_id] del self.all_jobs[job_id]
logger.info(f"[DeviceActionManager] Queued job {job_id[:4]} cancelled for {device_key}") job_log = format_job_log(
job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name
)
logger.info(f"[DeviceActionManager] Queued job {job_log} cancelled for {device_key}")
return True return True
logger.warning(f"[DeviceActionManager] Job {job_id[:4]} not found in active or queued jobs") job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.warning(f"[DeviceActionManager] Job {job_log} not found in active or queued jobs")
return False return False
def cancel_jobs_by_task_id(self, task_id: str) -> List[str]: def cancel_jobs_by_task_id(self, task_id: str) -> List[str]:
@@ -282,6 +318,27 @@ class DeviceActionManager:
return cancelled_job_ids return cancelled_job_ids
def check_ready_timeouts(self) -> List[JobInfo]:
"""检查READY状态超时的任务仅检测不处理"""
timeout_jobs = []
with self.lock:
# 统计READY状态的任务数量
ready_jobs_count = sum(1 for job in self.active_jobs.values() if job.status == JobStatus.READY)
if ready_jobs_count > 0:
logger.trace(f"[DeviceActionManager] Checking {ready_jobs_count} READY jobs for timeout") # type: ignore # noqa: E501
# 找到所有超时的READY任务只检测不处理
for job_info in self.active_jobs.values():
if job_info.is_ready_timeout():
timeout_jobs.append(job_info)
job_log = format_job_log(
job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name
)
logger.warning(f"[DeviceActionManager] Job {job_log} READY timeout detected")
return timeout_jobs
class MessageProcessor: class MessageProcessor:
"""消息处理线程 - 处理WebSocket消息划分任务执行和任务队列""" """消息处理线程 - 处理WebSocket消息划分任务执行和任务队列"""
@@ -541,19 +598,19 @@ class MessageProcessor:
# 添加到设备管理器 # 添加到设备管理器
can_start_immediately = self.device_manager.add_queue_request(job_info) can_start_immediately = self.device_manager.add_queue_request(job_info)
# 发送响应 job_log = format_job_log(job_id, task_id, device_id, action_name)
if can_start_immediately: if can_start_immediately:
# 可以立即开始 # 可以立即开始
await self._send_action_state_response( await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", True, 0 device_id, action_name, task_id, job_id, "query_action_status", True, 0
) )
logger.info(f"[MessageProcessor] Job {job_id[:4]} can start immediately") logger.info(f"[MessageProcessor] Job {job_log} can start immediately")
else: else:
# 需要排队 # 需要排队
await self._send_action_state_response( await self._send_action_state_response(
device_id, action_name, task_id, job_id, "query_action_status", False, 10 device_id, action_name, task_id, job_id, "query_action_status", False, 10
) )
logger.info(f"[MessageProcessor] Job {job_id[:4]} queued") logger.info(f"[MessageProcessor] Job {job_log} queued")
# 通知QueueProcessor有新的队列更新 # 通知QueueProcessor有新的队列更新
if self.queue_processor: if self.queue_processor:
@@ -564,13 +621,13 @@ class MessageProcessor:
try: try:
req = JobAddReq(**data) req = JobAddReq(**data)
# 开始执行任务 job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action)
success = self.device_manager.start_job(req.job_id) success = self.device_manager.start_job(req.job_id)
if not success: if not success:
logger.error(f"[MessageProcessor] Failed to start job {req.job_id[:4]}") logger.error(f"[MessageProcessor] Failed to start job {job_log}")
return return
logger.info(f"[MessageProcessor] Starting job {req.job_id[:4]} for {req.device_id}/{req.action}") logger.info(f"[MessageProcessor] Starting job {job_log}")
# 创建HostNode任务 # 创建HostNode任务
device_action_key = f"/devices/{req.device_id}/{req.action}" device_action_key = f"/devices/{req.device_id}/{req.action}"
@@ -602,7 +659,8 @@ class MessageProcessor:
# job_start出错时需要通过正确的publish_job_status方法来处理 # job_start出错时需要通过正确的publish_job_status方法来处理
if "req" in locals() and "queue_item" in locals(): if "req" in locals() and "queue_item" in locals():
logger.info(f"[MessageProcessor] Publishing failed status for job {req.job_id[:4]}") job_log = format_job_log(req.job_id, req.task_id, req.device_id, req.action)
logger.info(f"[MessageProcessor] Publishing failed status for job {job_log}")
if self.websocket_client: if self.websocket_client:
# 使用完整的错误信息,与原版本一致 # 使用完整的错误信息,与原版本一致
@@ -639,7 +697,10 @@ class MessageProcessor:
True, True,
0, 0,
) )
logger.info(f"[MessageProcessor] Started next job {next_job.job_id[:4]} after error") next_job_log = format_job_log(
next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name
)
logger.info(f"[MessageProcessor] Started next job {next_job_log} after error")
# 通知QueueProcessor有队列更新 # 通知QueueProcessor有队列更新
if self.queue_processor: if self.queue_processor:
@@ -655,6 +716,15 @@ class MessageProcessor:
logger.info(f"[MessageProcessor] Cancel request - task_id: {task_id}, job_id: {job_id}") logger.info(f"[MessageProcessor] Cancel request - task_id: {task_id}, job_id: {job_id}")
if job_id: if job_id:
# 获取job信息用于日志
job_info = self.device_manager.get_job_info(job_id)
job_log = format_job_log(
job_id,
job_info.task_id if job_info else "",
job_info.device_id if job_info else "",
job_info.action_name if job_info else "",
)
# 按job_id取消单个job # 按job_id取消单个job
success = self.device_manager.cancel_job(job_id) success = self.device_manager.cancel_job(job_id)
if success: if success:
@@ -662,13 +732,13 @@ class MessageProcessor:
host_node = HostNode.get_instance(0) host_node = HostNode.get_instance(0)
if host_node: if host_node:
host_node.cancel_goal(job_id) host_node.cancel_goal(job_id)
logger.info(f"[MessageProcessor] Job {job_id[:4]} cancelled") logger.info(f"[MessageProcessor] Job {job_log} cancelled")
# 通知QueueProcessor有队列更新 # 通知QueueProcessor有队列更新
if self.queue_processor: if self.queue_processor:
self.queue_processor.notify_queue_update() self.queue_processor.notify_queue_update()
else: else:
logger.warning(f"[MessageProcessor] Failed to cancel job {job_id[:4]}") logger.warning(f"[MessageProcessor] Failed to cancel job {job_log}")
elif task_id: elif task_id:
# 按task_id取消所有相关job # 按task_id取消所有相关job
@@ -732,6 +802,7 @@ class QueueProcessor:
def __init__(self, device_manager: DeviceActionManager, message_processor: MessageProcessor): def __init__(self, device_manager: DeviceActionManager, message_processor: MessageProcessor):
self.device_manager = device_manager self.device_manager = device_manager
self.message_processor = message_processor self.message_processor = message_processor
self.websocket_client = None # 延迟设置
# 线程控制 # 线程控制
self.is_running = False self.is_running = False
@@ -742,6 +813,10 @@ class QueueProcessor:
logger.info("[QueueProcessor] Initialized") logger.info("[QueueProcessor] Initialized")
def set_websocket_client(self, websocket_client: "WebSocketClient"):
"""设置WebSocket客户端引用"""
self.websocket_client = websocket_client
def start(self) -> None: def start(self) -> None:
"""启动队列处理线程""" """启动队列处理线程"""
if self.is_running: if self.is_running:
@@ -766,6 +841,36 @@ class QueueProcessor:
while self.is_running: while self.is_running:
try: try:
# 检查READY状态超时的任务
timeout_jobs = self.device_manager.check_ready_timeouts()
if timeout_jobs:
logger.info(f"[QueueProcessor] Found {len(timeout_jobs)} READY jobs that timed out")
# 为超时的job发布失败状态通过正常job完成流程处理
for timeout_job in timeout_jobs:
timeout_item = QueueItem(
task_type="job_call_back_status",
device_id=timeout_job.device_id,
action_name=timeout_job.action_name,
task_id=timeout_job.task_id,
job_id=timeout_job.job_id,
device_action_key=timeout_job.device_action_key,
)
# 发布超时失败状态这会触发正常的job完成流程
if self.websocket_client:
job_log = format_job_log(
timeout_job.job_id, timeout_job.task_id, timeout_job.device_id, timeout_job.action_name
)
logger.info(f"[QueueProcessor] Publishing timeout failure for job {job_log}")
self.websocket_client.publish_job_status(
{},
timeout_item,
"failed",
serialize_result_info("Job READY state timeout after 10 seconds", False, {}),
)
# 立即触发状态更新
self.notify_queue_update()
# 发送正在执行任务的running状态 # 发送正在执行任务的running状态
self._send_running_status() self._send_running_status()
@@ -811,7 +916,8 @@ class QueueProcessor:
}, },
} }
self.message_processor.send_message(message) self.message_processor.send_message(message)
logger.trace(f"[QueueProcessor] Sent running status for job {job_info.job_id[:4]}") # type: ignore job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
logger.trace(f"[QueueProcessor] Sent running status for job {job_log}") # type: ignore
def _send_busy_status(self): def _send_busy_status(self):
"""发送排队任务的busy状态""" """发送排队任务的busy状态"""
@@ -838,14 +944,24 @@ class QueueProcessor:
}, },
} }
success = self.message_processor.send_message(message) success = self.message_processor.send_message(message)
job_log = format_job_log(job_info.job_id, job_info.task_id, job_info.device_id, job_info.action_name)
if success: if success:
logger.debug(f"[QueueProcessor] Sent busy/need_more for queued job {job_info.job_id[:4]}") logger.debug(f"[QueueProcessor] Sent busy/need_more for queued job {job_log}")
else: else:
logger.warning(f"[QueueProcessor] Failed to send busy status for job {job_info.job_id[:4]}") logger.warning(f"[QueueProcessor] Failed to send busy status for job {job_log}")
def handle_job_completed(self, job_id: str, status: str) -> None: def handle_job_completed(self, job_id: str, status: str) -> None:
"""处理任务完成""" """处理任务完成"""
logger.info(f"[QueueProcessor] Job {job_id[:4]} completed with status: {status}") # 获取job信息用于日志
job_info = self.device_manager.get_job_info(job_id)
job_log = format_job_log(
job_id,
job_info.task_id if job_info else "",
job_info.device_id if job_info else "",
job_info.action_name if job_info else "",
)
logger.info(f"[QueueProcessor] Job {job_log} completed with status: {status}")
# 结束任务,获取下一个可执行的任务 # 结束任务,获取下一个可执行的任务
next_job = self.device_manager.end_job(job_id) next_job = self.device_manager.end_job(job_id)
@@ -865,7 +981,8 @@ class QueueProcessor:
}, },
} }
self.message_processor.send_message(message) self.message_processor.send_message(message)
logger.info(f"[QueueProcessor] Notified next job {next_job.job_id[:4]} can start") next_job_log = format_job_log(next_job.job_id, next_job.task_id, next_job.device_id, next_job.action_name)
logger.info(f"[QueueProcessor] Notified next job {next_job_log} can start")
# 立即触发下一轮状态检查 # 立即触发下一轮状态检查
self.notify_queue_update() self.notify_queue_update()
@@ -899,6 +1016,7 @@ class WebSocketClient(BaseCommunicationClient):
# 设置相互引用 # 设置相互引用
self.message_processor.set_queue_processor(self.queue_processor) self.message_processor.set_queue_processor(self.queue_processor)
self.message_processor.set_websocket_client(self) self.message_processor.set_websocket_client(self)
self.queue_processor.set_websocket_client(self)
logger.info(f"[WebSocketClient] Client_id: {self.client_id}") logger.info(f"[WebSocketClient] Client_id: {self.client_id}")
@@ -997,7 +1115,7 @@ class WebSocketClient(BaseCommunicationClient):
logger.info(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}") logger.info(f"[WebSocketClient] Intercepting final status for job_id: {item.job_id} - {status}")
# 通知队列处理器job完成 # 通知队列处理器job完成包括timeout的job
self.queue_processor.handle_job_completed(item.job_id, status) self.queue_processor.handle_job_completed(item.job_id, status)
# 发送job状态消息 # 发送job状态消息
@@ -1016,7 +1134,8 @@ class WebSocketClient(BaseCommunicationClient):
} }
self.message_processor.send_message(message) self.message_processor.send_message(message)
logger.debug(f"[WebSocketClient] Job status published: {item.job_id[:4]} - {status}") job_log = format_job_log(item.job_id, item.task_id, item.device_id, item.action_name)
logger.debug(f"[WebSocketClient] Job status published: {job_log} - {status}")
def send_ping(self, ping_id: str, timestamp: float) -> None: def send_ping(self, ping_id: str, timestamp: float) -> None:
"""发送ping消息""" """发送ping消息"""
@@ -1030,9 +1149,18 @@ class WebSocketClient(BaseCommunicationClient):
def cancel_goal(self, job_id: str) -> None: def cancel_goal(self, job_id: str) -> None:
"""取消指定的任务""" """取消指定的任务"""
logger.debug(f"[WebSocketClient] Cancel goal request for job_id: {job_id[:4]}") # 获取job信息用于日志
job_info = self.device_manager.get_job_info(job_id)
job_log = format_job_log(
job_id,
job_info.task_id if job_info else "",
job_info.device_id if job_info else "",
job_info.action_name if job_info else "",
)
logger.debug(f"[WebSocketClient] Cancel goal request for job: {job_log}")
success = self.device_manager.cancel_job(job_id) success = self.device_manager.cancel_job(job_id)
if success: if success:
logger.info(f"[WebSocketClient] Job {job_id[:4]} cancelled successfully") logger.info(f"[WebSocketClient] Job {job_log} cancelled successfully")
else: else:
logger.warning(f"[WebSocketClient] Failed to cancel job {job_id[:4]}") logger.warning(f"[WebSocketClient] Failed to cancel job {job_log}")