ws protocol

This commit is contained in:
Xuwznln
2025-09-02 18:51:27 +08:00
parent f4d4eb06d3
commit 831f4549f9

View File

@@ -60,6 +60,10 @@ class TaskScheduler:
self.active_jobs = {} # job_id -> 任务信息 self.active_jobs = {} # job_id -> 任务信息
self.cancel_events = {} # job_id -> asyncio.Event for cancellation self.cancel_events = {} # job_id -> asyncio.Event for cancellation
# 立即执行标记字典 - device_id+action_name -> timestamp
self.immediate_execution_flags = {} # 存储需要立即执行的设备动作组合
self.immediate_execution_lock = threading.Lock() # 立即执行标记锁
# 队列处理器 # 队列处理器
self.queue_processor_thread = None self.queue_processor_thread = None
self.queue_running = False self.queue_running = False
@@ -121,11 +125,16 @@ class TaskScheduler:
await asyncio.sleep(0.2) # 队列为空时等待 await asyncio.sleep(0.2) # 队列为空时等待
continue continue
with self.immediate_execution_lock:
expired_keys = [k for k, v in self.immediate_execution_flags.items() if current_time > v]
for k in expired_keys:
del self.immediate_execution_flags[k]
immediate_execution = self.immediate_execution_flags.copy()
# 处理每个任务 # 处理每个任务
for item in items_to_process: for item in items_to_process:
try: try:
# 检查是否到了执行时间,是我们本地的执行时间,按顺序填入 # 检查是否到了执行时间,是我们本地的执行时间,按顺序填入
if current_time < item.next_run_time: if current_time < item.next_run_time and item.device_action_key not in immediate_execution:
# 还没到执行时间,保留在队列中(保持原有顺序) # 还没到执行时间,保留在队列中(保持原有顺序)
items_to_requeue.append(item) items_to_requeue.append(item)
continue continue
@@ -145,12 +154,14 @@ class TaskScheduler:
item.next_run_time = current_time + 10 # 10秒后再次执行 item.next_run_time = current_time + 10 # 10秒后再次执行
item.retry_count += 1 item.retry_count += 1
items_to_requeue.append(item) items_to_requeue.append(item)
logger.critical( logger.trace( # type: ignore
f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} for {item.device_action_key}" f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} "
f"for {item.device_action_key}"
) )
else: else:
logger.warning( logger.debug(
f"[TaskScheduler] Completed {item.job_id} {item.task_type} for {item.device_action_key}" f"[TaskScheduler] Completed {item.job_id} {item.task_type} "
f"for {item.device_action_key}"
) )
except Exception as e: except Exception as e:
@@ -212,12 +223,12 @@ class TaskScheduler:
try: try:
# 检查任务是否还在活跃列表中 # 检查任务是否还在活跃列表中
if item.job_id not in self.active_jobs: if item.job_id not in self.active_jobs:
logger.critical(f"[TaskScheduler] Job {item.job_id} no longer active") logger.debug(f"[TaskScheduler] Job {item.job_id} no longer active")
return False return False
# 检查是否收到取消信号 # 检查是否收到取消信号
if item.job_id in self.cancel_events and self.cancel_events[item.job_id].is_set(): if item.job_id in self.cancel_events and self.cancel_events[item.job_id].is_set():
logger.critical(f"[TaskScheduler] Job {item.job_id} cancelled via cancel event") logger.info(f"[TaskScheduler] Job {item.job_id} cancelled via cancel event")
return False return False
# 检查设备状态 # 检查设备状态
@@ -299,7 +310,7 @@ class TaskScheduler:
await self._publish_device_action_state( await self._publish_device_action_state(
device_id, action_name, task_id, job_id, "query_action_status", True, 0 device_id, action_name, task_id, job_id, "query_action_status", True, 0
) )
logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately") logger.debug(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately")
host_node = HostNode.get_instance(0) host_node = HostNode.get_instance(0)
if not host_node: if not host_node:
logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}") logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}")
@@ -335,7 +346,10 @@ class TaskScheduler:
next_run_time=time.time() + 10, # 10秒后执行 next_run_time=time.time() + 10, # 10秒后执行
) )
self.action_queue.append(queue_item) self.action_queue.append(queue_item)
logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, added to polling queue {action_jobs}") logger.debug(
f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, "
f"added to polling queue {action_jobs}"
)
# 立即发送busy状态 # 立即发送busy状态
await self._publish_device_action_state( await self._publish_device_action_state(
@@ -350,7 +364,7 @@ class TaskScheduler:
req = JobAddReq(**data) req = JobAddReq(**data)
device_action_key = f"/devices/{req.device_id}/{req.action}" device_action_key = f"/devices/{req.device_id}/{req.action}"
logger.critical( logger.info(
f"[TaskScheduler] Starting job with job_id: {req.job_id}, " f"[TaskScheduler] Starting job with job_id: {req.job_id}, "
f"device: {req.device_id}, action: {req.action}" f"device: {req.device_id}, action: {req.action}"
) )
@@ -370,9 +384,7 @@ class TaskScheduler:
try: try:
# 启动callback定时发送 # 启动callback定时发送
await self._start_job_callback( await self._start_job_callback(req.job_id, req.device_id, req.action, req.task_id, device_action_key)
req.job_id, req.device_id, req.action, req.task_id, device_action_key
)
# 创建兼容HostNode的QueueItem对象 # 创建兼容HostNode的QueueItem对象
job_queue_item = QueueItem( job_queue_item = QueueItem(
@@ -405,7 +417,7 @@ class TaskScheduler:
host_node = HostNode.get_instance(0) host_node = HostNode.get_instance(0)
if host_node: if host_node:
host_node._device_action_status[device_action_key].job_ids.pop(req.job_id, None) host_node._device_action_status[device_action_key].job_ids.pop(req.job_id, None)
logger.critical(f"[TaskScheduler] Cleaned up failed job from HostNode: {req.job_id}") logger.warning(f"[TaskScheduler] Cleaned up failed job from HostNode: {req.job_id}")
except Exception as e: except Exception as e:
logger.error(f"[TaskScheduler] Error handling job start: {str(e)}") logger.error(f"[TaskScheduler] Error handling job start: {str(e)}")
@@ -414,7 +426,7 @@ class TaskScheduler:
task_id = data.get("task_id") task_id = data.get("task_id")
job_id = data.get("job_id") job_id = data.get("job_id")
logger.critical(f"[TaskScheduler] Handling cancel action request - task_id: {task_id}, job_id: {job_id}") logger.debug(f"[TaskScheduler] Handling cancel action request - task_id: {task_id}, job_id: {job_id}")
if not task_id and not job_id: if not task_id and not job_id:
logger.error("[TaskScheduler] cancel_action missing both task_id and job_id") logger.error("[TaskScheduler] cancel_action missing both task_id and job_id")
@@ -422,49 +434,49 @@ class TaskScheduler:
# 通过job_id取消 # 通过job_id取消
if job_id: if job_id:
logger.critical(f"[TaskScheduler] Cancelling job by job_id: {job_id}") logger.info(f"[TaskScheduler] Cancelling job by job_id: {job_id}")
# 设置取消事件 # 设置取消事件
if job_id in self.cancel_events: if job_id in self.cancel_events:
self.cancel_events[job_id].set() self.cancel_events[job_id].set()
logger.critical(f"[TaskScheduler] Set cancel event for job_id: {job_id}") logger.debug(f"[TaskScheduler] Set cancel event for job_id: {job_id}")
else: else:
logger.warning(f"[TaskScheduler] Cancel event not found for job_id: {job_id}") logger.warning(f"[TaskScheduler] Cancel event not found for job_id: {job_id}")
# 停止job callback并发送取消状态 # 停止job callback并发送取消状态
if job_id in self.active_jobs: if job_id in self.active_jobs:
logger.critical(f"[TaskScheduler] Found active job for cancellation: {job_id}") logger.debug(f"[TaskScheduler] Found active job for cancellation: {job_id}")
# 调用HostNode的cancel_goal # 调用HostNode的cancel_goal
host_node = HostNode.get_instance(0) host_node = HostNode.get_instance(0)
if host_node: if host_node:
host_node.cancel_goal(job_id) host_node.cancel_goal(job_id)
logger.critical(f"[TaskScheduler] Cancelled goal in HostNode for job_id: {job_id}") logger.info(f"[TaskScheduler] Cancelled goal in HostNode for job_id: {job_id}")
else: else:
logger.error(f"[TaskScheduler] HostNode not available for cancel goal: {job_id}") logger.error(f"[TaskScheduler] HostNode not available for cancel goal: {job_id}")
# 停止callback并发送取消状态 # 停止callback并发送取消状态
await self._stop_job_callback(job_id, "cancelled", "Job was cancelled by user request") await self._stop_job_callback(job_id, "cancelled", "Job was cancelled by user request")
logger.critical(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}") logger.info(f"[TaskScheduler] Stopped job callback and sent cancel status for job_id: {job_id}")
else: else:
logger.warning(f"[TaskScheduler] Job not found in active jobs for cancellation: {job_id}") logger.warning(f"[TaskScheduler] Job not found in active jobs for cancellation: {job_id}")
# 通过task_id取消需要查找对应的job_id # 通过task_id取消需要查找对应的job_id
if task_id and not job_id: if task_id and not job_id:
logger.critical(f"[TaskScheduler] Cancelling jobs by task_id: {task_id}") logger.debug(f"[TaskScheduler] Cancelling jobs by task_id: {task_id}")
jobs_to_cancel = [] jobs_to_cancel = []
for jid, job_info in self.active_jobs.items(): for jid, job_info in self.active_jobs.items():
if job_info.get("task_id") == task_id: if job_info.get("task_id") == task_id:
jobs_to_cancel.append(jid) jobs_to_cancel.append(jid)
logger.critical( logger.debug(
f"[TaskScheduler] Found {len(jobs_to_cancel)} jobs to cancel for task_id {task_id}: {jobs_to_cancel}" f"[TaskScheduler] Found {len(jobs_to_cancel)} jobs to cancel for task_id {task_id}: {jobs_to_cancel}"
) )
for jid in jobs_to_cancel: for jid in jobs_to_cancel:
logger.critical(f"[TaskScheduler] Recursively cancelling job_id: {jid} for task_id: {task_id}") logger.debug(f"[TaskScheduler] Recursively cancelling job_id: {jid} for task_id: {task_id}")
# 递归调用自身来取消每个job # 递归调用自身来取消每个job
await self.handle_cancel_action({"job_id": jid}) await self.handle_cancel_action({"job_id": jid})
logger.critical(f"[TaskScheduler] Completed cancel action handling - task_id: {task_id}, job_id: {job_id}") logger.debug(f"[TaskScheduler] Completed cancel action handling - task_id: {task_id}, job_id: {job_id}")
# job管理方法 # job管理方法
async def _start_job_callback( async def _start_job_callback(
@@ -472,7 +484,7 @@ class TaskScheduler:
) -> None: ) -> None:
"""启动job的callback定时发送""" """启动job的callback定时发送"""
if job_id not in self.active_jobs: if job_id not in self.active_jobs:
logger.warning(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") logger.debug(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}")
return return
# 检查是否已经启动过callback # 检查是否已经启动过callback
@@ -497,15 +509,13 @@ class TaskScheduler:
with self.action_queue_lock: with self.action_queue_lock:
self.action_queue.append(queue_item) self.action_queue.append(queue_item)
else: else:
logger.warning(f"[TaskScheduler] Action queue not available for job callback: {job_id}") logger.debug(f"[TaskScheduler] Action queue not available for job callback: {job_id}")
async def _stop_job_callback(self, job_id: str, final_status: str, return_info: Optional[str] = None) -> None: async def _stop_job_callback(self, job_id: str, final_status: str, return_info: Optional[str] = None) -> None:
"""停止job的callback定时发送并发送最终结果""" """停止job的callback定时发送并发送最终结果"""
logger.critical( logger.info(f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}")
f"[TaskScheduler] Stopping job callback for job_id: {job_id} with final status: {final_status}"
)
if job_id not in self.active_jobs: if job_id not in self.active_jobs:
logger.warning(f"[TaskScheduler] Job {job_id} not found in active jobs when stopping callback") logger.debug(f"[TaskScheduler] Job {job_id} not found in active jobs when stopping callback")
return return
job_info = self.active_jobs[job_id] job_info = self.active_jobs[job_id]
@@ -514,22 +524,20 @@ class TaskScheduler:
task_id = job_info["task_id"] task_id = job_info["task_id"]
device_action_key = job_info["device_action_key"] device_action_key = job_info["device_action_key"]
logger.critical( logger.debug(
f"[TaskScheduler] Job {job_id} details - device: {device_id}, action: {action_name}, task: {task_id}" f"[TaskScheduler] Job {job_id} details - device: {device_id}, action: {action_name}, task: {task_id}"
) )
# 移除活跃任务和取消事件这会让队列处理器自动停止callback # 移除活跃任务和取消事件这会让队列处理器自动停止callback
self.active_jobs.pop(job_id, None) self.active_jobs.pop(job_id, None)
self.cancel_events.pop(job_id, None) self.cancel_events.pop(job_id, None)
logger.critical(f"[TaskScheduler] Removed job {job_id} from active jobs and cancel events") logger.debug(f"[TaskScheduler] Removed job {job_id} from active jobs and cancel events")
# 发送最终的callback状态 # 发送最终的callback状态
await self._publish_device_action_state( await self._publish_device_action_state(
device_id, action_name, task_id, job_id, "job_call_back_status", True, 0 device_id, action_name, task_id, job_id, "job_call_back_status", True, 0
) )
logger.critical( logger.debug(f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}")
f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}"
)
# 外部接口方法 # 外部接口方法
def publish_job_status( def publish_job_status(
@@ -537,7 +545,7 @@ class TaskScheduler:
) -> None: ) -> None:
"""发布作业状态拦截最终结果给HostNode调用的接口""" """发布作业状态拦截最终结果给HostNode调用的接口"""
if not self.message_sender.is_connected(): if not self.message_sender.is_connected():
logger.warning(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") logger.debug(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}")
return return
# 拦截最终结果状态 # 拦截最终结果状态
@@ -545,13 +553,16 @@ class TaskScheduler:
host_node = HostNode.get_instance(0) host_node = HostNode.get_instance(0)
if host_node: if host_node:
host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id) host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id)
logger.critical(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") logger.info(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}")
# 给其他同名action至少执行一次的机会
with self.immediate_execution_lock:
self.immediate_execution_flags[item.device_action_key] = time.time() + 3
# 如果是最终状态通过_stop_job_callback处理 # 如果是最终状态通过_stop_job_callback处理
if self.message_sender.event_loop: if self.message_sender.event_loop:
asyncio.run_coroutine_threadsafe(self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop).result() asyncio.run_coroutine_threadsafe(
logger.critical(f"[TaskScheduler] Scheduled final callback stop for job_id: {item.job_id}") self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop
logger.critical(f"[TaskScheduler] Intercepted final job status: {item.job_id} - {status}") ).result()
# 执行结果信息上传
message = { message = {
"action": "job_status", "action": "job_status",
"data": { "data": {
@@ -570,24 +581,24 @@ class TaskScheduler:
loop.create_task(self.message_sender.send_message(message)) loop.create_task(self.message_sender.send_message(message))
except RuntimeError: except RuntimeError:
asyncio.run(self.message_sender.send_message(message)) asyncio.run(self.message_sender.send_message(message))
logger.critical(f"[TaskScheduler] Executed message send for job_id: {item.job_id} - {status}")
logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore
def cancel_goal(self, job_id: str) -> None: def cancel_goal(self, job_id: str) -> None:
"""取消指定的任务(给外部调用的接口)""" """取消指定的任务(给外部调用的接口)"""
logger.critical(f"[TaskScheduler] External cancel request for job_id: {job_id}") logger.debug(f"[TaskScheduler] External cancel request for job_id: {job_id}")
if job_id in self.cancel_events: if job_id in self.cancel_events:
logger.critical(f"[TaskScheduler] Found cancel event for job_id: {job_id}, processing cancellation") logger.debug(f"[TaskScheduler] Found cancel event for job_id: {job_id}, processing cancellation")
try: try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(self.handle_cancel_action({"job_id": job_id})) loop.create_task(self.handle_cancel_action({"job_id": job_id}))
logger.critical(f"[TaskScheduler] Scheduled cancel action for job_id: {job_id}") logger.debug(f"[TaskScheduler] Scheduled cancel action for job_id: {job_id}")
except RuntimeError: except RuntimeError:
asyncio.run(self.handle_cancel_action({"job_id": job_id})) asyncio.run(self.handle_cancel_action({"job_id": job_id}))
logger.critical(f"[TaskScheduler] Executed cancel action for job_id: {job_id}") logger.debug(f"[TaskScheduler] Executed cancel action for job_id: {job_id}")
logger.critical(f"[TaskScheduler] Initiated cancel for job_id: {job_id}") logger.debug(f"[TaskScheduler] Initiated cancel for job_id: {job_id}")
else: else:
logger.warning(f"[TaskScheduler] Job {job_id} not found in cancel events for cancellation") logger.debug(f"[TaskScheduler] Job {job_id} not found in cancel events for cancellation")
class WebSocketClient(BaseCommunicationClient): class WebSocketClient(BaseCommunicationClient):
@@ -865,7 +876,7 @@ class WebSocketClient(BaseCommunicationClient):
if self.task_scheduler: if self.task_scheduler:
self.task_scheduler.publish_job_status(feedback_data, item, status, return_info) self.task_scheduler.publish_job_status(feedback_data, item, status, return_info)
else: else:
logger.warning(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") logger.debug(f"[WebSocket] Task scheduler not available for job status: {item.job_id}")
def send_ping(self, ping_id: str, timestamp: float) -> None: def send_ping(self, ping_id: str, timestamp: float) -> None:
"""发送ping消息""" """发送ping消息"""
@@ -879,9 +890,9 @@ class WebSocketClient(BaseCommunicationClient):
def cancel_goal(self, job_id: str) -> None: def cancel_goal(self, job_id: str) -> None:
"""取消指定的任务转发给TaskScheduler""" """取消指定的任务转发给TaskScheduler"""
logger.critical(f"[WebSocket] Received cancel goal request for job_id: {job_id}") logger.debug(f"[WebSocket] Received cancel goal request for job_id: {job_id}")
if self.task_scheduler: if self.task_scheduler:
self.task_scheduler.cancel_goal(job_id) self.task_scheduler.cancel_goal(job_id)
logger.critical(f"[WebSocket] Forwarded cancel goal to TaskScheduler for job_id: {job_id}") logger.debug(f"[WebSocket] Forwarded cancel goal to TaskScheduler for job_id: {job_id}")
else: else:
logger.warning(f"[WebSocket] Task scheduler not available for cancel goal: {job_id}") logger.debug(f"[WebSocket] Task scheduler not available for cancel goal: {job_id}")