Merge branch 'dev' into workstation_dev_new

This commit is contained in:
Xuwznln
2025-09-04 20:20:33 +08:00
3 changed files with 62 additions and 34 deletions

View File

@@ -258,7 +258,7 @@ def main():
print_unilab_banner(args_dict)
# 注册表
build_registry(args_dict["registry_path"], False, args_dict["upload_registry"])
lab_registry = build_registry(args_dict["registry_path"], False, args_dict["upload_registry"])
if args_dict["graph"] is None:
request_startup_json = http_client.request_startup_json()
if not request_startup_json:
@@ -279,6 +279,27 @@ def main():
graph_res.physical_setup_graph = graph
resource_edge_info = modify_to_backend_format(data["links"])
materials = lab_registry.obtain_registry_resource_info()
materials.extend(lab_registry.obtain_registry_device_info())
materials = {k["id"]: k for k in materials}
nodes = {k["id"]: k for k in data["nodes"]}
edge_info = len(resource_edge_info)
for ind, i in enumerate(resource_edge_info[::-1]):
source_node = nodes[i["source"]]
target_node = nodes[i["target"]]
source_handle = i["sourceHandle"]
target_handle = i["targetHandle"]
source_handler_keys = [h["handler_key"] for h in materials[source_node["class"]]["handles"] if h["io_type"] == 'source']
target_handler_keys = [h["handler_key"] for h in materials[target_node["class"]]["handles"] if h["io_type"] == 'target']
if not source_handle in source_handler_keys:
print_status(f"节点 {source_node['id']} 的source端点 {source_handle} 不存在,请检查,支持的端点 {source_handler_keys}", "error")
resource_edge_info.pop(edge_info - ind - 1)
continue
if not target_handle in target_handler_keys:
print_status(f"节点 {target_node['id']} 的target端点 {target_handle} 不存在,请检查,支持的端点 {target_handler_keys}", "error")
resource_edge_info.pop(edge_info - ind - 1)
continue
devices_and_resources = dict_from_graph(graph_res.physical_setup_graph)
# args_dict["resources_config"] = initialize_resources(list(deepcopy(devices_and_resources).values()))
args_dict["resources_config"] = list(devices_and_resources.values())

View File

@@ -58,10 +58,12 @@ class HTTPClient:
headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"},
timeout=100,
)
if self.backend_go and response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"添加物料关系失败: {response.text}")
if response.status_code != 200 and response.status_code != 201:
logger.error(f"添加物料关系失败: {response.status_code}, {response.text}")
elif self.backend_go:
logger.info(f"添加物料关系 {response.text}")
return response
def resource_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response:
@@ -80,10 +82,12 @@ class HTTPClient:
headers={"Authorization": f"{'lab' if not self.backend_go else 'Lab'} {self.auth}"},
timeout=100,
)
if self.backend_go and response.status_code == 200:
res = response.json()
if "code" in res and res["code"] != 0:
logger.error(f"添加物料失败: {response.text}")
if response.status_code != 200:
logger.error(f"添加物料失败: {response.text}")
elif self.backend_go:
logger.info(f"添加物料 {response.text}")
return response
def resource_get(self, id: str, with_children: bool = False) -> Dict[str, Any]:

View File

@@ -142,9 +142,9 @@ class TaskScheduler:
# 执行相应的任务
should_continue = False
if item.task_type == "query_action_status":
should_continue = await self._process_query_status_item(item)
should_continue = asyncio.run_coroutine_threadsafe(self._process_query_status_item(item), self.message_sender.event_loop).result()
elif item.task_type == "job_call_back_status":
should_continue = await self._process_job_callback_item(item)
should_continue = asyncio.run_coroutine_threadsafe(self._process_job_callback_item(item), self.message_sender.event_loop).result()
else:
logger.warning(f"[TaskScheduler] Unknown task type: {item.task_type}")
continue
@@ -558,10 +558,9 @@ class TaskScheduler:
with self.immediate_execution_lock:
self.immediate_execution_flags[item.device_action_key] = time.time() + 3
# 如果是最终状态通过_stop_job_callback处理
if self.message_sender.event_loop:
asyncio.run_coroutine_threadsafe(
self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop
).result()
asyncio.run_coroutine_threadsafe(
self._stop_job_callback(item.job_id, status, return_info), self.message_sender.event_loop
).result()
# 执行结果信息上传
message = {
"action": "job_status",
@@ -576,11 +575,9 @@ class TaskScheduler:
"timestamp": time.time(),
},
}
try:
loop = asyncio.get_event_loop()
loop.create_task(self.message_sender.send_message(message))
except RuntimeError:
asyncio.run(self.message_sender.send_message(message))
asyncio.run_coroutine_threadsafe(
self.message_sender.send_message(message), self.message_sender.event_loop
).result()
logger.trace(f"[TaskScheduler] Job status published: {item.job_id} - {status}") # type: ignore
@@ -616,7 +613,7 @@ class WebSocketClient(BaseCommunicationClient):
# WebSocket连接相关
self.websocket = None
self.connection_loop = None
self.event_loop = None
self.event_loop: asyncio.AbstractEventLoop = None # type: ignore
self.connection_thread = None
self.is_running = False
self.connected = False
@@ -625,6 +622,9 @@ class WebSocketClient(BaseCommunicationClient):
self.message_queue = asyncio.Queue() if not self.is_disabled else None
self.reconnect_count = 0
# 消息发送锁(解决并发写入问题)- 延迟初始化
self.send_lock = None
# 任务调度器
self.task_scheduler = None
@@ -709,6 +709,9 @@ class WebSocketClient(BaseCommunicationClient):
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
# 在正确的事件循环中创建锁
self.send_lock = asyncio.Lock()
# 运行连接逻辑
self.event_loop.run_until_complete(self._connection_handler())
except Exception as e:
@@ -826,24 +829,26 @@ class WebSocketClient(BaseCommunicationClient):
if host_node:
host_node.handle_pong_response(pong_data)
# 消息发送方法
async def _send_message(self, message: Dict[str, Any]):
"""内部发送消息方法"""
# MessageSender接口实现
async def send_message(self, message: Dict[str, Any]) -> None:
"""内部发送消息方法,使用锁确保线程安全"""
if not self.connected or not self.websocket:
logger.warning("[WebSocket] Not connected, cannot send message")
return
try:
message_str = json.dumps(message, ensure_ascii=False)
await self.websocket.send(message_str)
logger.debug(f"[WebSocket] Message sent: {message['action']}")
except Exception as e:
logger.error(f"[WebSocket] Failed to send message: {str(e)}")
# 检查锁是否已初始化(在事件循环启动后才会创建)
if not self.send_lock:
logger.warning("[WebSocket] Send lock not initialized, cannot send message safely")
return
# MessageSender接口实现
async def send_message(self, message: Dict[str, Any]) -> None:
"""发送消息TaskScheduler调用的接口"""
await self._send_message(message)
message_str = json.dumps(message, ensure_ascii=False)
# 使用异步锁防止并发写入导致的竞态条件
async with self.send_lock:
try:
await self.websocket.send(message_str)
logger.debug(f"[WebSocket] Message sent: {message['action']}")
except Exception as e:
logger.error(f"[WebSocket] Failed to send message: {str(e)}")
def is_connected(self) -> bool:
"""检查是否已连接TaskScheduler调用的接口"""
@@ -865,8 +870,7 @@ class WebSocketClient(BaseCommunicationClient):
},
},
}
if self.event_loop:
asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop)
asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result()
logger.debug(f"[WebSocket] Device status published: {device_id}.{property_name}")
def publish_job_status(
@@ -884,8 +888,7 @@ class WebSocketClient(BaseCommunicationClient):
logger.warning("[WebSocket] Not connected, cannot send ping")
return
message = {"action": "ping", "data": {"ping_id": ping_id, "client_timestamp": timestamp}}
if self.event_loop:
asyncio.run_coroutine_threadsafe(self._send_message(message), self.event_loop)
asyncio.run_coroutine_threadsafe(self.send_message(message), self.event_loop).result()
logger.debug(f"[WebSocket] Ping sent: {ping_id}")
def cancel_goal(self, job_id: str) -> None: