ws test version 2

This commit is contained in:
Xuwznln
2025-09-02 18:29:05 +08:00
parent e3b8164f6b
commit f4d4eb06d3
3 changed files with 42 additions and 126 deletions

View File

@@ -53,6 +53,7 @@ class JobAddReq(BaseModel):
action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="") action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="")
action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="") action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="")
action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="") action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="")
task_id: str = Field(examples=["task_id"], description="task uuid")
job_id: str = Field(examples=["job_id"], description="goal uuid") job_id: str = Field(examples=["job_id"], description="goal uuid")
node_id: str = Field(examples=["node_id"], description="node uuid") node_id: str = Field(examples=["node_id"], description="node uuid")
server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info") server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info")

View File

@@ -60,8 +60,6 @@ class TaskScheduler:
self.active_jobs = {} # job_id -> 任务信息 self.active_jobs = {} # job_id -> 任务信息
self.cancel_events = {} # job_id -> asyncio.Event for cancellation self.cancel_events = {} # job_id -> asyncio.Event for cancellation
self.just_free_sets = {} # device_name + action_name -> end_timestamp
# 队列处理器 # 队列处理器
self.queue_processor_thread = None self.queue_processor_thread = None
self.queue_running = False self.queue_running = False
@@ -144,15 +142,15 @@ class TaskScheduler:
# 如果需要继续,放入重新排队列表 # 如果需要继续,放入重新排队列表
if should_continue: if should_continue:
item.next_run_time = current_time + 10.0 # 10秒后再次执行 item.next_run_time = current_time + 10 # 10秒后再次执行
item.retry_count += 1 item.retry_count += 1
items_to_requeue.append(item) items_to_requeue.append(item)
logger.critical( logger.critical(
f"[TaskScheduler] Re-queued {item.task_type} for {item.device_action_key}" f"[TaskScheduler] Re-queued {item.job_id} {item.task_type} for {item.device_action_key}"
) )
else: else:
logger.critical( logger.warning(
f"[TaskScheduler] Completed {item.task_type} for {item.device_action_key}" f"[TaskScheduler] Completed {item.job_id} {item.task_type} for {item.device_action_key}"
) )
except Exception as e: except Exception as e:
@@ -192,21 +190,16 @@ class TaskScheduler:
# 发送状态报告 # 发送状态报告
if free: if free:
# 设备空闲,发送最终状态并停止 # 设备空闲,发送最终状态并停止
# 下面要增加和handle_query_state相同的逻辑
host_node._device_action_status[item.device_action_key].job_ids[item.job_id] = time.time()
await self._publish_device_action_state( await self._publish_device_action_state(
item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", True, 0.0 item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", True, 0
) )
self.just_free_sets[item.device_action_key] = time.time() + 30.0
return False # 停止继续监控 return False # 停止继续监控
else: else:
if item.device_action_key in self.just_free_sets:
if time.time() < self.just_free_sets[item.device_action_key]:
return True # 继续监控
else:
del self.just_free_sets[item.device_action_key]
# 设备忙碌,发送状态并继续监控 # 设备忙碌,发送状态并继续监控
await self._publish_device_action_state( await self._publish_device_action_state(
item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", False, 10.0 item.device_id, item.action_name, item.task_id, item.job_id, "query_action_status", False, 10
) )
return True # 继续监控 return True # 继续监控
@@ -217,7 +210,6 @@ class TaskScheduler:
async def _process_job_callback_item(self, item: QueueItem) -> bool: async def _process_job_callback_item(self, item: QueueItem) -> bool:
"""处理job_call_back_status类型的队列项返回True表示需要继续False表示可以停止""" """处理job_call_back_status类型的队列项返回True表示需要继续False表示可以停止"""
try: try:
logger.critical(f"[TaskScheduler] Processing job callback item for job_id: {item.job_id}")
# 检查任务是否还在活跃列表中 # 检查任务是否还在活跃列表中
if item.job_id not in self.active_jobs: if item.job_id not in self.active_jobs:
logger.critical(f"[TaskScheduler] Job {item.job_id} no longer active") logger.critical(f"[TaskScheduler] Job {item.job_id} no longer active")
@@ -239,21 +231,15 @@ class TaskScheduler:
action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids) action_jobs = len(host_node._device_action_status[item.device_action_key].job_ids)
free = not bool(action_jobs) free = not bool(action_jobs)
logger.critical(
f"[TaskScheduler] Job {item.job_id} callback status check - free: {free}, action_jobs: {action_jobs}"
)
# 发送job_call_back_status状态 # 发送job_call_back_status状态
await self._publish_device_action_state( await self._publish_device_action_state(
item.device_id, item.action_name, item.task_id, item.job_id, "job_call_back_status", free, 10.0 item.device_id, item.action_name, item.task_id, item.job_id, "job_call_back_status", free, 10
) )
# 如果任务完成,停止监控 # 如果任务完成,停止监控
if free: if free:
logger.critical(f"[TaskScheduler] Job {item.job_id} callback monitoring completed - device is free")
return False return False
else: else:
logger.critical(f"[TaskScheduler] Job {item.job_id} callback monitoring continues - device is busy")
return True # 继续监控 return True # 继续监控
except Exception as e: except Exception as e:
@@ -262,13 +248,13 @@ class TaskScheduler:
# 消息发送方法 # 消息发送方法
async def _publish_device_action_state( async def _publish_device_action_state(
self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: float self, device_id: str, action_name: str, task_id: str, job_id: str, typ: str, free: bool, need_more: int
) -> None: ) -> None:
"""发布设备动作状态""" """发布设备动作状态"""
message = { message = {
"action": "report_action_state", "action": "report_action_state",
"data": { "data": {
"action": typ, "type": typ,
"device_id": device_id, "device_id": device_id,
"action_name": action_name, "action_name": action_name,
"task_id": task_id, "task_id": task_id,
@@ -278,36 +264,6 @@ class TaskScheduler:
}, },
} }
await self.message_sender.send_message(message) await self.message_sender.send_message(message)
logger.critical(f"[TaskScheduler] Published action state: {device_id}/{action_name} - {typ}")
async def _publish_final_job_status(
self, feedback_data: dict, item: QueueItem, status: str, return_info: Optional[str] = None
) -> None:
"""发布最终作业状态"""
if not self.message_sender.is_connected():
logger.warning("[TaskScheduler] Not connected, cannot publish final job status")
return
# 只处理最终状态
if status not in ["completed", "failed", "cancelled"]:
logger.critical(f"[TaskScheduler] Ignoring non-final status: {status}")
return
message = {
"action": "job_status",
"data": {
"job_id": item.job_id,
"task_id": item.task_id,
"device_id": item.device_id,
"action_name": item.action_name,
"status": status,
"feedback_data": feedback_data,
"return_info": return_info,
"timestamp": time.time(),
},
}
await self.message_sender.send_message(message)
logger.critical(f"[TaskScheduler] Final job status published: {item.job_id} - {status}")
# 业务逻辑处理方法 # 业务逻辑处理方法
async def handle_query_state(self, data: Dict[str, str]) -> None: async def handle_query_state(self, data: Dict[str, str]) -> None:
@@ -341,9 +297,14 @@ class TaskScheduler:
# 如果设备空闲立即响应free状态 # 如果设备空闲立即响应free状态
if free: if free:
await self._publish_device_action_state( await self._publish_device_action_state(
device_id, action_name, task_id, job_id, "query_action_status", True, 0.0 device_id, action_name, task_id, job_id, "query_action_status", True, 0
) )
logger.critical(f"[TaskScheduler] Device {device_id}/{action_name} is free, responded immediately") logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is free, responded immediately")
host_node = HostNode.get_instance(0)
if not host_node:
logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {job_id}")
return
host_node._device_action_status[device_action_key].job_ids[job_id] = time.time()
return return
# 设备忙碌时,检查是否已有相同的轮询任务 # 设备忙碌时,检查是否已有相同的轮询任务
@@ -371,14 +332,14 @@ class TaskScheduler:
task_id=task_id, task_id=task_id,
job_id=job_id, job_id=job_id,
device_action_key=device_action_key, device_action_key=device_action_key,
next_run_time=time.time() + 10.0, # 10秒后执行 next_run_time=time.time() + 10, # 10秒后执行
) )
self.action_queue.append(queue_item) self.action_queue.append(queue_item)
logger.critical(f"[TaskScheduler] Device {device_id}/{action_name} is busy, added to polling queue") logger.error(f"[TaskScheduler] {job_id} Device {device_id}/{action_name} is busy, added to polling queue {action_jobs}")
# 立即发送busy状态 # 立即发送busy状态
await self._publish_device_action_state( await self._publish_device_action_state(
device_id, action_name, task_id, job_id, "query_action_status", False, 10.0 device_id, action_name, task_id, job_id, "query_action_status", False, 10
) )
else: else:
logger.warning("[TaskScheduler] Action queue not available") logger.warning("[TaskScheduler] Action queue not available")
@@ -404,22 +365,13 @@ class TaskScheduler:
"callback_started": False, # 标记callback是否已启动 "callback_started": False, # 标记callback是否已启动
} }
# 创建取消事件 # 创建取消事件todo要移动到query_state中
self.cancel_events[req.job_id] = asyncio.Event() self.cancel_events[req.job_id] = asyncio.Event()
logger.critical(f"[TaskScheduler] Created cancel event for job_id: {req.job_id}")
try: try:
host_node = HostNode.get_instance(0)
if not host_node:
logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}")
return
host_node._device_action_status[device_action_key].job_ids[req.job_id] = time.time()
logger.critical(f"[TaskScheduler] Job registered in HostNode: {req.job_id}")
# 启动callback定时发送 # 启动callback定时发送
await self._start_job_callback( await self._start_job_callback(
req.job_id, req.device_id, req.action, data.get("task_id", ""), device_action_key req.job_id, req.device_id, req.action, req.task_id, device_action_key
) )
# 创建兼容HostNode的QueueItem对象 # 创建兼容HostNode的QueueItem对象
@@ -427,20 +379,21 @@ class TaskScheduler:
task_type="job_call_back_status", task_type="job_call_back_status",
device_id=req.device_id, device_id=req.device_id,
action_name=req.action, action_name=req.action,
task_id=data.get("task_id", ""), task_id=req.task_id,
job_id=req.job_id, job_id=req.job_id,
device_action_key=device_action_key, device_action_key=device_action_key,
next_run_time=time.time(), next_run_time=time.time(),
) )
host_node = HostNode.get_instance(0)
logger.critical(f"[TaskScheduler] Sending goal to HostNode for job_id: {req.job_id}") if not host_node:
logger.error(f"[TaskScheduler] HostNode instance not available for job_id: {req.job_id}")
return
host_node.send_goal( host_node.send_goal(
job_queue_item, job_queue_item,
action_type=req.action_type, action_type=req.action_type,
action_kwargs=req.action_args, action_kwargs=req.action_args,
server_info=req.server_info, server_info=req.server_info,
) )
logger.critical(f"[TaskScheduler] Goal sent successfully for job_id: {req.job_id}")
except Exception as e: except Exception as e:
logger.error(f"[TaskScheduler] Exception during job start for job_id {req.job_id}: {str(e)}") logger.error(f"[TaskScheduler] Exception during job start for job_id {req.job_id}: {str(e)}")
traceback.print_exc() traceback.print_exc()
@@ -518,7 +471,6 @@ class TaskScheduler:
self, job_id: str, device_id: str, action_name: str, task_id: str, device_action_key: str self, job_id: str, device_id: str, action_name: str, task_id: str, device_action_key: str
) -> None: ) -> None:
"""启动job的callback定时发送""" """启动job的callback定时发送"""
logger.critical(f"[TaskScheduler] Starting job callback for job_id: {job_id}")
if job_id not in self.active_jobs: if job_id not in self.active_jobs:
logger.warning(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}") logger.warning(f"[TaskScheduler] Job not found in active jobs when starting callback: {job_id}")
return return
@@ -530,7 +482,6 @@ class TaskScheduler:
# 标记callback已启动 # 标记callback已启动
self.active_jobs[job_id]["callback_started"] = True self.active_jobs[job_id]["callback_started"] = True
logger.critical(f"[TaskScheduler] Marked callback as started for job_id: {job_id}")
# 将job_call_back_status任务放入队列 # 将job_call_back_status任务放入队列
queue_item = QueueItem( queue_item = QueueItem(
@@ -540,12 +491,11 @@ class TaskScheduler:
task_id=task_id, task_id=task_id,
job_id=job_id, job_id=job_id,
device_action_key=device_action_key, device_action_key=device_action_key,
next_run_time=time.time() + 10.0, # 10秒后开始报送 next_run_time=time.time() + 10, # 10秒后开始报送
) )
if self.action_queue is not None: if self.action_queue is not None:
with self.action_queue_lock: with self.action_queue_lock:
self.action_queue.append(queue_item) self.action_queue.append(queue_item)
logger.critical(f"[TaskScheduler] Added job callback to queue for job_id: {job_id}")
else: else:
logger.warning(f"[TaskScheduler] Action queue not available for job callback: {job_id}") logger.warning(f"[TaskScheduler] Action queue not available for job callback: {job_id}")
@@ -575,23 +525,8 @@ class TaskScheduler:
# 发送最终的callback状态 # 发送最终的callback状态
await self._publish_device_action_state( await self._publish_device_action_state(
device_id, action_name, task_id, job_id, "job_call_back_status", True, 0.0 device_id, action_name, task_id, job_id, "job_call_back_status", True, 0
) )
logger.critical(f"[TaskScheduler] Published final callback status for job_id: {job_id}")
# 发送最终的job状态
error_queue_item = QueueItem(
task_type="job_call_back_status",
device_id=device_id,
action_name=action_name,
task_id=task_id,
job_id=job_id,
device_action_key=device_action_key,
next_run_time=time.time(),
)
await self._publish_final_job_status({}, error_queue_item, final_status, return_info)
logger.critical(f"[TaskScheduler] Published final job status for job_id: {job_id}")
logger.critical( logger.critical(
f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}" f"[TaskScheduler] Completed stopping job callback for {job_id} with final status: {final_status}"
) )
@@ -601,28 +536,22 @@ class TaskScheduler:
self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None
) -> None: ) -> None:
"""发布作业状态拦截最终结果给HostNode调用的接口""" """发布作业状态拦截最终结果给HostNode调用的接口"""
logger.critical(f"[TaskScheduler] Publishing job status for job_id: {item.job_id} - status: {status}")
if not self.message_sender.is_connected(): if not self.message_sender.is_connected():
logger.warning(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}") logger.warning(f"[TaskScheduler] Not connected, cannot publish job status for job_id: {item.job_id}")
return return
# 拦截最终结果状态 # 拦截最终结果状态
if status in ["completed", "failed"]: if status in ["success", "failed"]:
host_node = HostNode.get_instance(0)
if host_node:
host_node._device_action_status[item.device_action_key].job_ids.pop(item.job_id)
logger.critical(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}") logger.critical(f"[TaskScheduler] Intercepting final status for job_id: {item.job_id} - {status}")
# 如果是最终状态通过_stop_job_callback处理 # 如果是最终状态通过_stop_job_callback处理
try: if self.message_sender.event_loop:
loop = asyncio.get_event_loop() asyncio.run_coroutine_threadsafe(self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop).result()
loop.create_task(self._stop_job_callback(item.job_id, status, return_info)) logger.critical(f"[TaskScheduler] Scheduled final callback stop for job_id: {item.job_id}")
logger.critical(f"[TaskScheduler] Scheduled final callback stop for job_id: {item.job_id}")
except RuntimeError:
# 如果没有运行的事件循环,创建一个
asyncio.run(self._stop_job_callback(item.job_id, status, return_info))
logger.critical(f"[TaskScheduler] Executed final callback stop for job_id: {item.job_id}")
logger.critical(f"[TaskScheduler] Intercepted final job status: {item.job_id} - {status}") logger.critical(f"[TaskScheduler] Intercepted final job status: {item.job_id} - {status}")
return
# 对于running状态正常发布
logger.critical(f"[TaskScheduler] Publishing running status for job_id: {item.job_id} - {status}")
message = { message = {
"action": "job_status", "action": "job_status",
"data": { "data": {
@@ -639,7 +568,6 @@ class TaskScheduler:
try: try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(self.message_sender.send_message(message)) loop.create_task(self.message_sender.send_message(message))
logger.critical(f"[TaskScheduler] Scheduled message send for job_id: {item.job_id} - {status}")
except RuntimeError: except RuntimeError:
asyncio.run(self.message_sender.send_message(message)) asyncio.run(self.message_sender.send_message(message))
logger.critical(f"[TaskScheduler] Executed message send for job_id: {item.job_id} - {status}") logger.critical(f"[TaskScheduler] Executed message send for job_id: {item.job_id} - {status}")
@@ -717,9 +645,9 @@ class WebSocketClient(BaseCommunicationClient):
else: else:
scheme = "ws" scheme = "ws"
if ":" in parsed.netloc and parsed.port is not None: if ":" in parsed.netloc and parsed.port is not None:
self.websocket_url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/lab" self.websocket_url = f"{scheme}://{parsed.hostname}:{parsed.port + 1}/api/v1/ws/schedule"
else: else:
self.websocket_url = f"{scheme}://{parsed.netloc}/api/v1/ws/lab" self.websocket_url = f"{scheme}://{parsed.netloc}/api/v1/ws/schedule"
logger.debug(f"[WebSocket] URL: {self.websocket_url}") logger.debug(f"[WebSocket] URL: {self.websocket_url}")
# 连接管理方法 # 连接管理方法
@@ -934,10 +862,8 @@ class WebSocketClient(BaseCommunicationClient):
self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None self, feedback_data: dict, item: "QueueItem", status: str, return_info: Optional[str] = None
) -> None: ) -> None:
"""发布作业状态转发给TaskScheduler""" """发布作业状态转发给TaskScheduler"""
logger.critical(f"[WebSocket] Received job status for job_id: {item.job_id} - status: {status}")
if self.task_scheduler: if self.task_scheduler:
self.task_scheduler.publish_job_status(feedback_data, item, status, return_info) self.task_scheduler.publish_job_status(feedback_data, item, status, return_info)
logger.critical(f"[WebSocket] Forwarded job status to TaskScheduler for job_id: {item.job_id}")
else: else:
logger.warning(f"[WebSocket] Task scheduler not available for job status: {item.job_id}") logger.warning(f"[WebSocket] Task scheduler not available for job status: {item.job_id}")

View File

@@ -233,12 +233,12 @@ class HostNode(BaseROS2DeviceNode):
client: HTTPClient = bridge client: HTTPClient = bridge
resource_start_time = time.time() resource_start_time = time.time()
# resource_add_res = client.resource_add(add_schema(resource_with_parent_name), False) resource_add_res = client.resource_add(add_schema(resource_with_parent_name), False)
resource_end_time = time.time() resource_end_time = time.time()
self.lab_logger().info( self.lab_logger().info(
f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms" f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
) )
# resource_add_res = client.resource_edge_add(self.resources_edge_config, False) resource_add_res = client.resource_edge_add(self.resources_edge_config, False)
resource_edge_end_time = time.time() resource_edge_end_time = time.time()
self.lab_logger().info( self.lab_logger().info(
f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms" f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms"
@@ -707,7 +707,6 @@ class HostNode(BaseROS2DeviceNode):
def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None: def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""获取结果回调""" """获取结果回调"""
job_id = item.job_id job_id = item.job_id
self._device_action_status[f"/devices/{item.device_id}/{item.action_name}"].job_ids.pop(item.job_id)
result_msg = future.result().result result_msg = future.result().result
result_data = convert_from_ros_msg(result_msg) result_data = convert_from_ros_msg(result_msg)
status = "success" status = "success"
@@ -738,16 +737,6 @@ class HostNode(BaseROS2DeviceNode):
for bridge in self.bridges: for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"): if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(result_data, item, status, return_info_str) bridge.publish_job_status(result_data, item, status, return_info_str)
# 如果是WebSocket客户端通知任务完成
if hasattr(bridge, "_finish_job_callback_status"):
import asyncio
free = True # 任务完成,设备空闲
need_more = 0.0 # 任务结束,不需要更多时间
try:
asyncio.create_task(bridge._finish_job_callback_status(job_id, free, need_more))
except Exception as e:
self.lab_logger().error(f"[Host Node] Error finishing job callback status: {e}")
def cancel_goal(self, goal_uuid: str) -> None: def cancel_goal(self, goal_uuid: str) -> None:
"""取消目标""" """取消目标"""