feat: add sk & ak

This commit is contained in:
Xuwznln
2025-08-20 21:23:08 +08:00
parent 4b7bde6be5
commit 02c79363c1
23 changed files with 407 additions and 336 deletions

View File

@@ -134,6 +134,18 @@ def parse_args():
default="",
help="实验室唯一ID也可通过环境变量 UNILABOS_MQCONFIG_LABID 设置或传入--config设置",
)
parser.add_argument(
"--ak",
type=str,
default="",
help="实验室请求的ak",
)
parser.add_argument(
"--sk",
type=str,
default="",
help="实验室请求的sk",
)
parser.add_argument(
"--skip_env_check",
action="store_true",
@@ -211,6 +223,8 @@ def main():
print_status("远程资源不存在,本地将进行首次上报!", "info")
# 设置BasicConfig参数
BasicConfig.ak = args_dict.get("ak", "")
BasicConfig.sk = args_dict.get("sk", "")
BasicConfig.working_dir = working_dir
BasicConfig.is_host_mode = not args_dict.get("without_host", False)
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)

View File

@@ -1,44 +1,59 @@
import argparse
import json
import time
from unilabos.config.config import BasicConfig
from unilabos.registry.registry import build_registry
from unilabos.app.main import load_config_from_file
from unilabos.utils.log import logger
from unilabos.utils.type_check import TypeEncoder
def register_devices_and_resources(mqtt_client, lab_registry):
"""
注册设备和资源到 MQTT
"""
logger.info("[UniLab Register] 开始注册设备和资源...")
# 注册设备信息
for device_info in lab_registry.obtain_registry_device_info():
mqtt_client.publish_registry(device_info["id"], device_info, False)
logger.debug(f"[UniLab Register] 注册设备: {device_info['id']}")
# # 注册资源信息
# for resource_info in lab_registry.obtain_registry_resource_info():
# mqtt_client.publish_registry(resource_info["id"], resource_info, False)
# logger.debug(f"[UniLab Register] 注册资源: {resource_info['id']}")
# 注册资源信息 - 使用HTTP方式
from unilabos.app.web.client import http_client
logger.info("[UniLab Register] 开始注册设备和资源...")
if BasicConfig.auth_secret():
# 注册设备信息
devices_to_register = {}
for device_info in lab_registry.obtain_registry_device_info():
devices_to_register[device_info["id"]] = json.loads(json.dumps(device_info, ensure_ascii=False, cls=TypeEncoder))
logger.debug(f"[UniLab Register] 收集设备: {device_info['id']}")
resources_to_register = {}
for resource_info in lab_registry.obtain_registry_resource_info():
resources_to_register[resource_info["id"]] = resource_info
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
print("[UniLab Register] 设备注册", http_client.resource_registry({"resources": list(devices_to_register.values())}).text)
print("[UniLab Register] 资源注册", http_client.resource_registry({"resources": list(resources_to_register.values())}).text)
else:
# 注册设备信息
for device_info in lab_registry.obtain_registry_device_info():
mqtt_client.publish_registry(device_info["id"], device_info, False)
logger.debug(f"[UniLab Register] 注册设备: {device_info['id']}")
resources_to_register = {}
for resource_info in lab_registry.obtain_registry_resource_info():
resources_to_register[resource_info["id"]] = resource_info
logger.debug(f"[UniLab Register] 准备注册资源: {resource_info['id']}")
# # 注册资源信息
# for resource_info in lab_registry.obtain_registry_resource_info():
# mqtt_client.publish_registry(resource_info["id"], resource_info, False)
# logger.debug(f"[UniLab Register] 注册资源: {resource_info['id']}")
if resources_to_register:
start_time = time.time()
response = http_client.resource_registry(resources_to_register)
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功通过HTTP注册 {len(resources_to_register)} 个资源 {cost_time}ms")
else:
logger.error(f"[UniLab Register] HTTP注册资源失败: {response.status_code}, {response.text} {cost_time}ms")
resources_to_register = {}
for resource_info in lab_registry.obtain_registry_resource_info():
resources_to_register[resource_info["id"]] = resource_info
logger.debug(f"[UniLab Register] 准备注册资源: {resource_info['id']}")
if resources_to_register:
start_time = time.time()
response = http_client.resource_registry(resources_to_register)
cost_time = time.time() - start_time
if response.status_code in [200, 201]:
logger.info(f"[UniLab Register] 成功通过HTTP注册 {len(resources_to_register)} 个资源 {cost_time}ms")
else:
logger.error(f"[UniLab Register] HTTP注册资源失败: {response.status_code}, {response.text} {cost_time}ms")
logger.info("[UniLab Register] 设备和资源注册完成.")
@@ -60,6 +75,18 @@ def main():
default=None,
help="配置文件路径,支持.py格式的Python配置文件",
)
parser.add_argument(
"--ak",
type=str,
default="",
help="实验室请求的ak",
)
parser.add_argument(
"--sk",
type=str,
default="",
help="实验室请求的sk",
)
parser.add_argument(
"--complete_registry",
action="store_true",
@@ -68,6 +95,8 @@ def main():
)
args = parser.parse_args()
load_config_from_file(args.config)
BasicConfig.ak = args.ak
BasicConfig.sk = args.sk
# 构建注册表
build_registry(args.registry, args.complete_registry, True)
from unilabos.app.mq import mqtt_client

View File

@@ -15,6 +15,7 @@ from unilabos.utils import logger
class HTTPClient:
"""HTTP客户端用于与远程服务器通信"""
backend_go = False # 是否使用Go后端
def __init__(self, remote_addr: Optional[str] = None, auth: Optional[str] = None) -> None:
"""
@@ -28,7 +29,13 @@ class HTTPClient:
if auth is not None:
self.auth = auth
else:
self.auth = MQConfig.lab_id
auth_secret = BasicConfig.auth_secret()
if auth_secret:
self.auth = auth_secret
self.backend_go = True
info(f"正在使用ak sk作为授权信息 {auth_secret}")
else:
self.auth = MQConfig.lab_id
info(f"HTTPClient 初始化完成: remote_addr={self.remote_addr}")
def resource_edge_add(self, resources: List[Dict[str, Any]], database_process_later: bool) -> requests.Response:
@@ -43,7 +50,8 @@ class HTTPClient:
"""
database_param = 1 if database_process_later else 0
response = requests.post(
f"{self.remote_addr}/lab/resource/edge/batch_create/?database_process_later={database_param}",
f"{self.remote_addr}/lab/resource/edge/batch_create/?database_process_later={database_param}"
if not self.backend_go else f"{self.remote_addr}/lab/material/edge",
json=resources,
headers={"Authorization": f"lab {self.auth}"},
timeout=100,
@@ -63,13 +71,15 @@ class HTTPClient:
Response: API响应对象
"""
response = requests.post(
f"{self.remote_addr}/lab/resource/?database_process_later={1 if database_process_later else 0}",
json=resources,
f"{self.remote_addr}/lab/resource/?database_process_later={1 if database_process_later else 0}" if not self.backend_go else f"{self.remote_addr}/lab/material",
json=resources if not self.backend_go else {"nodes": resources},
headers={"Authorization": f"lab {self.auth}"},
timeout=100,
)
if response.status_code != 200:
logger.error(f"添加物料失败: {response.text}")
elif self.backend_go:
logger.info(f"添加物料 {response.text}")
return response
def resource_get(self, id: str, with_children: bool = False) -> Dict[str, Any]:
@@ -84,7 +94,7 @@ class HTTPClient:
Dict: 返回的资源数据
"""
response = requests.get(
f"{self.remote_addr}/lab/resource/?edge_format=1",
f"{self.remote_addr}/lab/resource/?edge_format=1" if not self.backend_go else f"{self.remote_addr}/lab/material",
params={"id": id, "with_children": with_children},
headers={"Authorization": f"lab {self.auth}"},
timeout=20,
@@ -151,18 +161,18 @@ class HTTPClient:
)
return response
def resource_registry(self, registry_data: Dict[str, Any]) -> requests.Response:
def resource_registry(self, registry_data: Dict[str, Any] | List[Dict[str, Any]]) -> requests.Response:
"""
注册资源到服务器
Args:
registry_data: 注册表数据,格式为 {resource_id: resource_info}
registry_data: 注册表数据,格式为 {resource_id: resource_info} / [{resource_info}]
Returns:
Response: API响应对象
"""
response = requests.post(
f"{self.remote_addr}/lab/registry/",
f"{self.remote_addr}/lab/registry/" if not self.backend_go else f"{self.remote_addr}/lab/resource",
json=registry_data,
headers={"Authorization": f"lab {self.auth}"},
timeout=30,