修复event loop错误

This commit is contained in:
Xuwznln
2025-09-04 17:11:50 +08:00
parent 0b56efc89d
commit 4e52c7d2f4

View File

@@ -558,10 +558,9 @@ class TaskScheduler:
with self.immediate_execution_lock: with self.immediate_execution_lock:
self.immediate_execution_flags[item.device_action_key] = time.time() + 3 self.immediate_execution_flags[item.device_action_key] = time.time() + 3
# 如果是最终状态通过_stop_job_callback处理 # 如果是最终状态通过_stop_job_callback处理
if self.message_sender.event_loop: asyncio.run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe( self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop
self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop ).result()
).result()
# 执行结果信息上传 # 执行结果信息上传
message = { message = {
"action": "job_status", "action": "job_status",
@@ -576,11 +575,9 @@ class TaskScheduler:
"timestamp": time.time(), "timestamp": time.time(),
}, },
} }
try: asyncio.run_coroutine_threadsafe(
loop = asyncio.get_event_loop() self.message_sender.send_message(message), self.message_sender.event_loop
loop.create_task(self.message_sender.send_message(message)) ).result()
except RuntimeError:
asyncio.run(self.message_sender.send_message(message))
logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore
@@ -616,7 +613,7 @@ class WebSocketClient(BaseCommunicationClient):
# WebSocket连接相关 # WebSocket连接相关
self.websocket = None self.websocket = None
self.connection_loop = None self.connection_loop = None
self.event_loop = None self.event_loop: asyncio.AbstractEventLoop = None # type: ignore
self.connection_thread = None self.connection_thread = None
self.is_running = False self.is_running = False
self.connected = False self.connected = False
@@ -826,13 +823,12 @@ class WebSocketClient(BaseCommunicationClient):
if host_node: if host_node:
host_node.handle_pong_response(pong_data) host_node.handle_pong_response(pong_data)
# 消息发送方法 # MessageSender接口实现
async def _send_message(self, message: Dict[str, Any]): async def send_message(self, message: Dict[str, Any]) -> None:
"""内部发送消息方法""" """内部发送消息方法"""
if not self.connected or not self.websocket: if not self.connected or not self.websocket:
logger.warning("[WebSocket] Not connected, cannot send message") logger.warning("[WebSocket] Not connected, cannot send message")
return return
try: try:
message_str = json.dumps(message, ensure_ascii=False) message_str = json.dumps(message, ensure_ascii=False)
await self.websocket.send(message_str) await self.websocket.send(message_str)
@@ -840,11 +836,6 @@ class WebSocketClient(BaseCommunicationClient):
except Exception as e: except Exception as e:
logger.error(f"[WebSocket] Failed to send message: {str(e)}") logger.error(f"[WebSocket] Failed to send message: {str(e)}")
# MessageSender接口实现
async def send_message(self, message: Dict[str, Any]) -> None:
"""发送消息TaskScheduler调用的接口"""
await self._send_message(message)
def is_connected(self) -> bool: def is_connected(self) -> bool:
"""检查是否已连接TaskScheduler调用的接口""" """检查是否已连接TaskScheduler调用的接口"""
return self.connected and not self.is_disabled return self.connected and not self.is_disabled
@@ -866,7 +857,7 @@ class WebSocketClient(BaseCommunicationClient):
}, },
} }
if self.event_loop: if self.event_loop:
asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop)
logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}") logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}")
def publish_job_status( def publish_job_status(
@@ -885,7 +876,7 @@ class WebSocketClient(BaseCommunicationClient):
return return
message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}} message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}}
if self.event_loop: if self.event_loop:
asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop) asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop)
logger.debug(f"[WebSocket] Ping sent: {ping_id}") logger.debug(f"[WebSocket] Ping sent: {ping_id}")
def cancel_goal(self, job_id: str) -> None: def cancel_goal(self, job_id: str) -> None: