mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2026-02-13 11:15:12 +00:00
Compare commits
3 Commits
v0.10.1
...
0b56378287
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b56378287 | ||
|
|
51b47596ce | ||
|
|
42e8befec4 |
@@ -57,5 +57,5 @@ requirements:
|
|||||||
- robostack-staging::ros-humble-rosidl-default-runtime
|
- robostack-staging::ros-humble-rosidl-default-runtime
|
||||||
- robostack-staging::ros-humble-std-msgs
|
- robostack-staging::ros-humble-std-msgs
|
||||||
- robostack-staging::ros-humble-geometry-msgs
|
- robostack-staging::ros-humble-geometry-msgs
|
||||||
- robostack-staging::ros2-distro-mutex=0.6.*
|
# - robostack-staging::ros2-distro-mutex=0.6.*
|
||||||
- sel(osx and x86_64): __osx >={{ MACOSX_DEPLOYMENT_TARGET|default('10.14') }}
|
- sel(osx and x86_64): __osx >={{ MACOSX_DEPLOYMENT_TARGET|default('10.14') }}
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ from unilabos.utils.banner_print import print_status, print_unilab_banner
|
|||||||
|
|
||||||
def load_config_from_file(config_path, override_labid=None):
|
def load_config_from_file(config_path, override_labid=None):
|
||||||
if config_path is None:
|
if config_path is None:
|
||||||
config_path = os.environ.get("UNILABOS.BASICCONFIG.CONFIG_PATH", None)
|
config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH", None)
|
||||||
if config_path:
|
if config_path:
|
||||||
if not os.path.exists(config_path):
|
if not os.path.exists(config_path):
|
||||||
print_status(f"配置文件 {config_path} 不存在", "error")
|
print_status(f"配置文件 {config_path} 不存在", "error")
|
||||||
@@ -126,7 +126,7 @@ def parse_args():
|
|||||||
"--labid",
|
"--labid",
|
||||||
type=str,
|
type=str,
|
||||||
default="",
|
default="",
|
||||||
help="实验室唯一ID,也可通过环境变量 UNILABOS.MQCONFIG.LABID 设置或传入--config设置",
|
help="实验室唯一ID,也可通过环境变量 UNILABOS_MQCONFIG_LABID 设置或传入--config设置",
|
||||||
)
|
)
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
@@ -140,10 +140,24 @@ def main():
|
|||||||
|
|
||||||
# 加载配置文件,优先加载config,然后从env读取
|
# 加载配置文件,优先加载config,然后从env读取
|
||||||
config_path = args_dict.get("config")
|
config_path = args_dict.get("config")
|
||||||
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
|
if os.getcwd().endswith("unilabos_data"):
|
||||||
if not config_path and (not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py"))):
|
working_dir = os.path.abspath(os.getcwd())
|
||||||
print_status(f"当前未指定config路径,非第一次使用请通过 --config 传入 local_config.py 文件路径", "info")
|
else:
|
||||||
print_status(f"您是否为第一次使用?并将当前文件路径 {working_dir} 作为工作目录? (Y/n)", "info")
|
working_dir = os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
|
||||||
|
if args_dict.get("working_dir"):
|
||||||
|
working_dir = args_dict.get("working_dir")
|
||||||
|
if config_path and not os.path.exists(config_path):
|
||||||
|
config_path = os.path.join(working_dir, "local_config.py")
|
||||||
|
if not os.path.exists(config_path):
|
||||||
|
print_status(
|
||||||
|
f"当前工作目录 {working_dir} 未找到local_config.py,请通过 --config 传入 local_config.py 文件路径",
|
||||||
|
"error")
|
||||||
|
os._exit(1)
|
||||||
|
elif os.path.exists(working_dir) and os.path.exists(os.path.join(working_dir, "local_config.py")):
|
||||||
|
config_path = os.path.join(working_dir, "local_config.py")
|
||||||
|
elif not config_path and (not os.path.exists(working_dir) or not os.path.exists(os.path.join(working_dir, "local_config.py"))):
|
||||||
|
print_status(f"未指定config路径,可通过 --config 传入 local_config.py 文件路径", "info")
|
||||||
|
print_status(f"您是否为第一次使用?并将当前路径 {working_dir} 作为工作目录? (Y/n)", "info")
|
||||||
if input() != "n":
|
if input() != "n":
|
||||||
os.makedirs(working_dir, exist_ok=True)
|
os.makedirs(working_dir, exist_ok=True)
|
||||||
config_path = os.path.join(working_dir, "local_config.py")
|
config_path = os.path.join(working_dir, "local_config.py")
|
||||||
@@ -153,16 +167,8 @@ def main():
|
|||||||
os._exit(1)
|
os._exit(1)
|
||||||
else:
|
else:
|
||||||
os._exit(1)
|
os._exit(1)
|
||||||
else:
|
|
||||||
working_dir = args_dict.get("working_dir") or os.path.abspath(os.path.join(os.getcwd(), "unilabos_data"))
|
|
||||||
if working_dir:
|
|
||||||
if config_path and not os.path.exists(config_path):
|
|
||||||
config_path = os.path.join(working_dir, "local_config.py")
|
|
||||||
if not os.path.exists(config_path):
|
|
||||||
print_status(f"当前工作目录 {working_dir} 未找到local_config.py,请通过 --config 传入 local_config.py 文件路径", "error")
|
|
||||||
os._exit(1)
|
|
||||||
print_status(f"当前工作目录为 {working_dir}", "info")
|
|
||||||
# 加载配置文件
|
# 加载配置文件
|
||||||
|
print_status(f"当前工作目录为 {working_dir}", "info")
|
||||||
load_config_from_file(config_path, args_dict["labid"])
|
load_config_from_file(config_path, args_dict["labid"])
|
||||||
|
|
||||||
# 设置BasicConfig参数
|
# 设置BasicConfig参数
|
||||||
@@ -204,10 +210,11 @@ def main():
|
|||||||
print_status("联网获取设备加载文件成功", "info")
|
print_status("联网获取设备加载文件成功", "info")
|
||||||
graph, data = read_node_link_json(request_startup_json)
|
graph, data = read_node_link_json(request_startup_json)
|
||||||
else:
|
else:
|
||||||
if args_dict["graph"].endswith(".json"):
|
file_path = args_dict["graph"]
|
||||||
graph, data = read_node_link_json(args_dict["graph"])
|
if file_path.endswith(".json"):
|
||||||
|
graph, data = read_node_link_json(file_path)
|
||||||
else:
|
else:
|
||||||
graph, data = read_graphml(args_dict["graph"])
|
graph, data = read_graphml(file_path)
|
||||||
import unilabos.resources.graphio as graph_res
|
import unilabos.resources.graphio as graph_res
|
||||||
|
|
||||||
graph_res.physical_setup_graph = graph
|
graph_res.physical_setup_graph = graph
|
||||||
|
|||||||
@@ -4,11 +4,12 @@ HTTP客户端模块
|
|||||||
提供与远程服务器通信的客户端功能,只有host需要用
|
提供与远程服务器通信的客户端功能,只有host需要用
|
||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
from typing import List, Dict, Any, Optional
|
from typing import List, Dict, Any, Optional
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from unilabos.utils.log import info
|
from unilabos.utils.log import info
|
||||||
from unilabos.config.config import MQConfig, HTTPConfig
|
from unilabos.config.config import MQConfig, HTTPConfig, BasicConfig
|
||||||
from unilabos.utils import logger
|
from unilabos.utils import logger
|
||||||
|
|
||||||
|
|
||||||
@@ -189,7 +190,7 @@ class HTTPClient:
|
|||||||
logger.error(f"请求启动配置失败: {response.status_code}, {response.text}")
|
logger.error(f"请求启动配置失败: {response.status_code}, {response.text}")
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
with open("startup_config.json", "w", encoding="utf-8") as f:
|
with open(os.path.join(BasicConfig.working_dir, "startup_config.json"), "w", encoding="utf-8") as f:
|
||||||
f.write(response.text)
|
f.write(response.text)
|
||||||
target_dict = json.loads(response.text)
|
target_dict = json.loads(response.text)
|
||||||
if "data" in target_dict:
|
if "data" in target_dict:
|
||||||
|
|||||||
@@ -109,13 +109,13 @@ def _update_config_from_module(module, override_labid: str):
|
|||||||
|
|
||||||
|
|
||||||
def _update_config_from_env():
|
def _update_config_from_env():
|
||||||
prefix = "UNILABOS."
|
prefix = "UNILABOS_"
|
||||||
for env_key, env_value in os.environ.items():
|
for env_key, env_value in os.environ.items():
|
||||||
if not env_key.startswith(prefix):
|
if not env_key.startswith(prefix):
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
key_path = env_key[len(prefix):] # Remove UNILAB_ prefix
|
key_path = env_key[len(prefix):] # Remove UNILAB_ prefix
|
||||||
class_field = key_path.upper().split(".", 1)
|
class_field = key_path.upper().split("_", 1)
|
||||||
if len(class_field) != 2:
|
if len(class_field) != 2:
|
||||||
logger.warning(f"[ENV] 环境变量格式不正确:{env_key}")
|
logger.warning(f"[ENV] 环境变量格式不正确:{env_key}")
|
||||||
continue
|
continue
|
||||||
@@ -163,12 +163,12 @@ def _update_config_from_env():
|
|||||||
def load_config(config_path=None, override_labid=None):
|
def load_config(config_path=None, override_labid=None):
|
||||||
# 如果提供了配置文件路径,从该文件导入配置
|
# 如果提供了配置文件路径,从该文件导入配置
|
||||||
if config_path:
|
if config_path:
|
||||||
_update_config_from_env() # 允许config_path被env设定后读取
|
env_config_path = os.environ.get("UNILABOS_BASICCONFIG_CONFIG_PATH")
|
||||||
|
config_path = env_config_path if env_config_path else config_path
|
||||||
BasicConfig.config_path = os.path.abspath(os.path.dirname(config_path))
|
BasicConfig.config_path = os.path.abspath(os.path.dirname(config_path))
|
||||||
if not os.path.exists(config_path):
|
if not os.path.exists(config_path):
|
||||||
logger.error(f"[ENV] 配置文件 {config_path} 不存在")
|
logger.error(f"[ENV] 配置文件 {config_path} 不存在")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
module_name = "lab_" + os.path.basename(config_path).replace(".py", "")
|
module_name = "lab_" + os.path.basename(config_path).replace(".py", "")
|
||||||
spec = importlib.util.spec_from_file_location(module_name, config_path)
|
spec = importlib.util.spec_from_file_location(module_name, config_path)
|
||||||
@@ -179,6 +179,7 @@ def load_config(config_path=None, override_labid=None):
|
|||||||
spec.loader.exec_module(module) # type: ignore
|
spec.loader.exec_module(module) # type: ignore
|
||||||
_update_config_from_module(module, override_labid)
|
_update_config_from_module(module, override_labid)
|
||||||
logger.info(f"[ENV] 配置文件 {config_path} 加载成功")
|
logger.info(f"[ENV] 配置文件 {config_path} 加载成功")
|
||||||
|
_update_config_from_env()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"[ENV] 加载配置文件 {config_path} 失败")
|
logger.error(f"[ENV] 加载配置文件 {config_path} 失败")
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|||||||
@@ -8,9 +8,9 @@ class MQConfig:
|
|||||||
broker_url = ""
|
broker_url = ""
|
||||||
port = 1883
|
port = 1883
|
||||||
|
|
||||||
ca_file = "CA.crt"
|
ca_file = "./CA.crt"
|
||||||
cert_file = "lab.crt"
|
cert_file = "./lab.crt"
|
||||||
key_file = "lab.key"
|
key_file = "./lab.key"
|
||||||
|
|
||||||
# HTTP配置
|
# HTTP配置
|
||||||
class HTTPConfig:
|
class HTTPConfig:
|
||||||
|
|||||||
Reference in New Issue
Block a user