修复可能的web template找不到的问题

新增联网获取json启动
删除非-g传入启动json的方式
兼容传参参数名短横线与下划线
This commit is contained in:
Xuwznln
2025-07-31 14:25:40 +08:00
parent c6ac32c115
commit 2b3cec5640
9 changed files with 129 additions and 48 deletions

View File

@@ -1,6 +1,5 @@
import argparse
import asyncio
import json
import os
import signal
import sys
@@ -10,7 +9,7 @@ from copy import deepcopy
import yaml
from unilabos.resources.graphio import tree_to_list, modify_to_backend_format
from unilabos.resources.graphio import modify_to_backend_format
# 首先添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
@@ -18,7 +17,7 @@ unilabos_dir = os.path.dirname(os.path.dirname(current_dir))
if unilabos_dir not in sys.path:
sys.path.append(unilabos_dir)
from unilabos.config.config import load_config, BasicConfig, _update_config_from_env
from unilabos.config.config import load_config, BasicConfig
from unilabos.utils.banner_print import print_status, print_unilab_banner
@@ -37,12 +36,22 @@ def load_config_from_file(config_path, override_labid=None):
load_config(config_path, override_labid)
def convert_argv_dashes_to_underscores(args: argparse.ArgumentParser):
# easier for user input, easier for dev search code
option_strings = list(args._option_string_actions.keys())
for i, arg in enumerate(sys.argv):
for option_string in option_strings:
if arg.startswith(option_string):
new_arg = arg[:2] + arg[2:len(option_string)].replace("-", "_") + arg[len(option_string):]
sys.argv[i] = new_arg
break
def parse_args():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description="Start Uni-Lab Edge server.")
parser.add_argument("-g", "--graph", help="Physical setup graph.")
parser.add_argument("-d", "--devices", help="Devices config file.")
parser.add_argument("-r", "--resources", help="Resources config file.")
# parser.add_argument("-d", "--devices", help="Devices config file.")
# parser.add_argument("-r", "--resources", help="Resources config file.")
parser.add_argument("-c", "--controllers", default=None, help="Controllers config file.")
parser.add_argument(
"--registry_path",
@@ -51,6 +60,12 @@ def parse_args():
action="append",
help="Path to the registry",
)
parser.add_argument(
"--working_dir",
type=str,
default=None,
help="Path to the working directory",
)
parser.add_argument(
"--backend",
choices=["ros", "simple", "automancer"],
@@ -92,12 +107,12 @@ def parse_args():
)
parser.add_argument(
"--disable_browser",
action='store_true',
action="store_true",
help="是否在启动时关闭信息页",
)
parser.add_argument(
"--2d_vis",
action='store_true',
action="store_true",
help="是否在pylabrobot实例启动时同时启动可视化",
)
parser.add_argument(
@@ -112,14 +127,15 @@ def parse_args():
default="",
help="实验室唯一ID也可通过环境变量 UNILABOS.MQCONFIG.LABID 设置或传入--config设置",
)
return parser.parse_args()
return parser
def main():
"""主函数"""
# 解析命令行参数
args = parse_args()
args_dict = vars(args)
convert_argv_dashes_to_underscores(args)
args_dict = vars(args.parse_args())
# 加载配置文件优先加载config然后从env读取
config_path = args_dict.get("config")
@@ -152,30 +168,30 @@ def main():
# 注册表
build_registry(args_dict["registry_path"])
resource_edge_info = []
devices_and_resources = None
if args_dict["graph"] is not None:
import unilabos.resources.graphio as graph_res
if args_dict["graph"] is None:
request_startup_json = http_client.request_startup_json()
if not request_startup_json:
print_status(
"未指定设备加载文件路径尝试从HTTP获取失败请检查网络或者使用-g参数指定设备加载文件路径", "error"
)
os._exit(1)
else:
print_status("联网获取设备加载文件成功", "info")
graph, data = read_node_link_json(request_startup_json)
else:
if args_dict["graph"].endswith(".json"):
graph, data = read_node_link_json(args_dict["graph"])
else:
graph, data = read_graphml(args_dict["graph"])
graph_res.physical_setup_graph = graph
resource_edge_info = modify_to_backend_format(data["links"])
devices_and_resources = dict_from_graph(graph_res.physical_setup_graph)
# args_dict["resources_config"] = initialize_resources(list(deepcopy(devices_and_resources).values()))
args_dict["resources_config"] = list(devices_and_resources.values())
args_dict["devices_config"] = dict_to_nested_dict(deepcopy(devices_and_resources), devices_only=False)
args_dict["graph"] = graph_res.physical_setup_graph
else:
if args_dict["devices"] is None or args_dict["resources"] is None:
print_status("Either graph or devices and resources must be provided.", "error")
sys.exit(1)
args_dict["devices_config"] = json.load(open(args_dict["devices"], encoding="utf-8"))
# args_dict["resources_config"] = initialize_resources(
# list(json.load(open(args_dict["resources"], encoding="utf-8")).values())
# )
args_dict["resources_config"] = list(json.load(open(args_dict["resources"], encoding="utf-8")).values())
import unilabos.resources.graphio as graph_res
graph_res.physical_setup_graph = graph
resource_edge_info = modify_to_backend_format(data["links"])
devices_and_resources = dict_from_graph(graph_res.physical_setup_graph)
# args_dict["resources_config"] = initialize_resources(list(deepcopy(devices_and_resources).values()))
args_dict["resources_config"] = list(devices_and_resources.values())
args_dict["devices_config"] = dict_to_nested_dict(deepcopy(devices_and_resources), devices_only=False)
args_dict["graph"] = graph_res.physical_setup_graph
print_status(f"{len(args_dict['resources_config'])} Resources loaded:", "info")
for i in args_dict["resources_config"]:
@@ -207,13 +223,22 @@ def main():
if args_dict["visual"] != "disable":
enable_rviz = args_dict["visual"] == "rviz"
if devices_and_resources is not None:
from unilabos.device_mesh.resource_visalization import ResourceVisualization # 此处开启后logger会变更为INFO有需要请调整
resource_visualization = ResourceVisualization(devices_and_resources, args_dict["resources_config"] ,enable_rviz=enable_rviz)
from unilabos.device_mesh.resource_visalization import (
ResourceVisualization,
) # 此处开启后logger会变更为INFO有需要请调整
resource_visualization = ResourceVisualization(
devices_and_resources, args_dict["resources_config"], enable_rviz=enable_rviz
)
args_dict["resources_mesh_config"] = resource_visualization.resource_model
start_backend(**args_dict)
server_thread = threading.Thread(target=start_server, kwargs=dict(
open_browser=not args_dict["disable_browser"], port=args_dict["port"],
))
server_thread = threading.Thread(
target=start_server,
kwargs=dict(
open_browser=not args_dict["disable_browser"],
port=args_dict["port"],
),
)
server_thread.start()
asyncio.set_event_loop(asyncio.new_event_loop())
resource_visualization.start()
@@ -221,10 +246,16 @@ def main():
time.sleep(1)
else:
start_backend(**args_dict)
start_server(open_browser=not args_dict["disable_browser"], port=args_dict["port"],)
start_server(
open_browser=not args_dict["disable_browser"],
port=args_dict["port"],
)
else:
start_backend(**args_dict)
start_server(open_browser=not args_dict["disable_browser"], port=args_dict["port"],)
start_server(
open_browser=not args_dict["disable_browser"],
port=args_dict["port"],
)
if __name__ == "__main__":

View File

@@ -3,7 +3,7 @@ HTTP客户端模块
提供与远程服务器通信的客户端功能只有host需要用
"""
import json
from typing import List, Dict, Any, Optional
import requests
@@ -170,6 +170,36 @@ class HTTPClient:
logger.error(f"注册资源失败: {response.status_code}, {response.text}")
return response
def request_startup_json(self) -> Optional[Dict[str, Any]]:
"""
请求启动配置
Args:
startup_json: 启动配置JSON数据
Returns:
Response: API响应对象
"""
response = requests.get(
f"{self.remote_addr}/lab/resource/graph_info/",
headers={"Authorization": f"lab {self.auth}"},
timeout=(3, 30),
)
if response.status_code != 200:
logger.error(f"请求启动配置失败: {response.status_code}, {response.text}")
else:
try:
with open("startup_config.json", "w", encoding="utf-8") as f:
f.write(response.text)
target_dict = json.loads(response.text)
if "data" in target_dict:
target_dict = target_dict["data"]
return target_dict
except json.JSONDecodeError as e:
logger.error(f"解析启动配置JSON失败: {str(e.args)}\n响应内容: {response.text}")
logger.error(f"响应内容: {response.text}")
return None
# 创建默认客户端实例
http_client = HTTPClient()