7 Commits

Author SHA1 Message Date
Xuwznln
c35da65b15 fix resource_get param 2025-11-08 14:40:45 +08:00
Xuwznln
659cf05be6 fix json dumps 2025-11-08 12:08:46 +08:00
Xuwznln
3b8deb4d1d support name change during materials change 2025-11-08 12:08:45 +08:00
Xuwznln
c796615f9f enable slave mode 2025-11-07 21:15:05 +08:00
Xuwznln
a5bad6074f change uuid logger to trace level 2025-11-07 21:15:05 +08:00
Xuwznln
1d3a07a736 correct remove_resource stats 2025-11-07 21:15:03 +08:00
Xuwznln
cc2cd57cdf disable slave connect websocket 2025-11-07 20:39:26 +08:00
9 changed files with 315 additions and 190 deletions

View File

@@ -13,7 +13,7 @@ def start_backend(
graph=None,
controllers_config: dict = {},
bridges=[],
without_host: bool = False,
is_slave: bool = False,
visual: str = "None",
resources_mesh_config: dict = {},
**kwargs,
@@ -32,7 +32,7 @@ def start_backend(
raise ValueError(f"Unsupported backend: {backend}")
backend_thread = threading.Thread(
target=main if not without_host else slave,
target=main if not is_slave else slave,
args=(
devices_config,
resources_config,

View File

@@ -375,22 +375,23 @@ def main():
args_dict["bridges"] = []
# 获取通信客户端仅支持WebSocket
comm_client = get_communication_client()
if "websocket" in args_dict["app_bridges"]:
args_dict["bridges"].append(comm_client)
if "fastapi" in args_dict["app_bridges"]:
args_dict["bridges"].append(http_client)
if "websocket" in args_dict["app_bridges"]:
# 获取通信客户端仅支持WebSocket
if BasicConfig.is_host_mode:
comm_client = get_communication_client()
if "websocket" in args_dict["app_bridges"]:
args_dict["bridges"].append(comm_client)
def _exit(signum, frame):
comm_client.stop()
sys.exit(0)
def _exit(signum, frame):
comm_client.stop()
sys.exit(0)
signal.signal(signal.SIGINT, _exit)
signal.signal(signal.SIGTERM, _exit)
comm_client.start()
else:
print_status("SlaveMode跳过Websocket连接")
signal.signal(signal.SIGINT, _exit)
signal.signal(signal.SIGTERM, _exit)
comm_client.start()
args_dict["resources_mesh_config"] = {}
args_dict["resources_edge_config"] = resource_edge_info
# web visiualize 2D

View File

@@ -1,11 +1,12 @@
import json
import time
from typing import Optional, Tuple, Dict, Any
from unilabos.utils.log import logger
from unilabos.utils.type_check import TypeEncoder
def register_devices_and_resources(lab_registry):
def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
"""
注册设备和资源到服务器仅支持HTTP
"""
@@ -28,6 +29,8 @@ def register_devices_and_resources(lab_registry):
resources_to_register[resource_info["id"]] = resource_info
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
if gather_only:
return devices_to_register, resources_to_register
# 注册设备
if devices_to_register:
try:

View File

@@ -1,11 +1,11 @@
from __future__ import annotations
import re
import traceback
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from collections import Counter
import asyncio
import time
import pprint as pp
import traceback
from collections import Counter
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
from pylabrobot.liquid_handling.standard import GripDirection

View File

@@ -6,11 +6,12 @@ from typing import Optional, Dict, Any, List
import rclpy
from unilabos_msgs.srv._serial_command import SerialCommand_Response
from unilabos.app.register import register_devices_and_resources
from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
from unilabos_msgs.srv import SerialCommand # type: ignore
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
from rclpy.timer import Timer
@@ -108,66 +109,51 @@ def slave(
rclpy_init_args: List[str] = ["--log-level", "debug"],
) -> None:
"""从节点函数"""
# 1. 初始化 ROS2
if not rclpy.ok():
rclpy.init(args=rclpy_init_args)
executor = rclpy.__executor
if not executor:
executor = rclpy.__executor = MultiThreadedExecutor()
devices_instances = {}
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
devices_instances[device_id] = d
# 默认初始化
# if d is not None and isinstance(d, Node):
# executor.add_node(d)
# else:
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
resource_mesh_manager = ResourceMeshManager(
resources_mesh_config,
resources_config, # type: ignore FIXME
resource_tracker=DeviceNodeResourceTracker(),
device_id="resource_mesh_manager",
)
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
executor.add_node(resource_mesh_manager)
executor.add_node(joint_republisher)
# 1.5 启动 executor 线程
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
thread.start()
# 2. 创建 Slave Machine Node
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
executor.add_node(n)
# 3. 向 Host 报送节点信息和物料,获取 UUID 映射
uuid_mapping = {}
if not BasicConfig.slave_no_host:
# 3.1 报送节点信息
sclient = n.create_client(SerialCommand, "/node_info_update")
sclient.wait_for_service()
registry_config = {}
devices_to_register, resources_to_register = register_devices_and_resources(lab_registry, True)
registry_config.update(devices_to_register)
registry_config.update(resources_to_register)
request = SerialCommand.Request()
request.command = json.dumps(
{
"machine_name": BasicConfig.machine_name,
"type": "slave",
"devices_config": devices_config.dump(),
"registry_config": lab_registry.obtain_registry_device_info(),
"registry_config": registry_config,
},
ensure_ascii=False,
cls=TypeEncoder,
)
response = sclient.call_async(request).result()
sclient.call_async(request).result()
logger.info(f"Slave node info updated.")
# 使用新的 c2s_update_resource_tree 服务
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
rclient.wait_for_service()
# 序列化 ResourceTreeSet 为 JSON
# 3.2 报送物料树,获取 UUID 映射
if resources_config:
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
rclient.wait_for_service()
request = SerialCommand.Request()
request.command = json.dumps(
{
@@ -180,35 +166,61 @@ def slave(
},
ensure_ascii=False,
)
tree_response: SerialCommand_Response = rclient.call_async(request).result()
tree_response: SerialCommand_Response = rclient.call(request)
uuid_mapping = json.loads(tree_response.response)
# 创建反向映射new_uuid -> old_uuid
reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
for node in resources_config.root_nodes:
if node.res_content.type == "device":
for sub_node in node.children:
# 只有二级子设备
if sub_node.res_content.type != "device":
device_tracker = devices_instances[node.res_content.id].resource_tracker
# sub_node.res_content.uuid 已经是新UUID需要用旧UUID去查找
old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid)
if old_uuid:
# 找到旧UUID使用UUID查找
resource_instance = device_tracker.figure_resource({"uuid": old_uuid})
else:
# 未找到旧UUID使用name查找
resource_instance = device_tracker.figure_resource({"name": sub_node.res_content.name})
device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
logger.info(f"Slave resource tree added. UUID mapping: {len(uuid_mapping)} nodes")
# 3.3 使用 UUID 映射更新 resources_config 的 UUID参考 client.py 逻辑)
old_uuids = {node.res_content.uuid: node for node in resources_config.all_nodes}
for old_uuid, node in old_uuids.items():
if old_uuid in uuid_mapping:
new_uuid = uuid_mapping[old_uuid]
node.res_content.uuid = new_uuid
# 更新所有子节点的 parent_uuid
for child in node.children:
child.res_content.parent_uuid = new_uuid
else:
logger.error("Slave模式不允许新增非设备节点下的物料")
continue
if tree_response:
logger.info(f"Slave resource tree added. Response: {tree_response.response}")
else:
logger.warning("Slave resource tree add response is None")
logger.warning(f"资源UUID未更新: {old_uuid}")
else:
logger.info("No resources to add.")
# 4. 初始化所有设备实例(此时 resources_config 的 UUID 已更新)
devices_instances = {}
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type == "device":
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
if d is not None:
devices_instances[device_id] = d
logger.info(f"Device {device_id} initialized.")
else:
logger.warning(f"Device {device_id} initialization failed.")
# 5. 如果启用可视化,创建可视化相关节点
if visual != "disable":
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
# 将 ResourceTreeSet 转换为 list 用于 visual 组件
resources_list = (
[node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
if resources_config
else []
)
resource_mesh_manager = ResourceMeshManager(
resources_mesh_config,
resources_list,
resource_tracker=DeviceNodeResourceTracker(),
device_id="resource_mesh_manager",
)
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
lh_joint_pub = LiquidHandlerJointPublisher(
resources_config=resources_list, resource_tracker=DeviceNodeResourceTracker()
)
executor.add_node(resource_mesh_manager)
executor.add_node(joint_republisher)
executor.add_node(lh_joint_pub)
# 7. 保持运行
while True:
time.sleep(1)

View File

@@ -638,6 +638,145 @@ class BaseROS2DeviceNode(Node, Generic[T]):
- remove: 从资源树中移除资源
"""
from pylabrobot.resources.resource import Resource as ResourcePLR
def _handle_add(
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
处理资源添加操作的内部函数
Args:
plr_resources: PLR资源列表
tree_set: 资源树集合
additional_add_params: 额外的添加参数
Returns:
操作结果字典
"""
for plr_resource, tree in zip(plr_resources, tree_set.trees):
self.resource_tracker.add_resource(plr_resource)
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
return {"success": True, "action": "add"}
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
"""
处理资源移除操作的内部函数
Args:
resources_uuid: 要移除的资源UUID列表
Returns:
操作结果字典,包含移除的资源列表
"""
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
[{"uuid": uid} for uid in resources_uuid], try_mode=True
)
found_plr_resources = []
other_plr_resources = []
for found_resource in found_resources:
for resource in found_resource:
if issubclass(resource.__class__, ResourcePLR):
found_plr_resources.append(resource)
else:
other_plr_resources.append(resource)
# 调用driver的remove回调
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(found_plr_resources)
# 从parent卸载并从tracker移除
for plr_resource in found_plr_resources:
if plr_resource.parent is not None:
plr_resource.parent.unassign_child_resource(plr_resource)
self.resource_tracker.remove_resource(plr_resource)
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
for other_plr_resource in other_plr_resources:
self.resource_tracker.remove_resource(other_plr_resource)
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
return {
"success": True,
"action": "remove",
# "removed_plr": found_plr_resources,
# "removed_other": other_plr_resources,
}
def _handle_update(
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
) -> Dict[str, Any]:
"""
处理资源更新操作的内部函数
Args:
plr_resources: PLR资源列表包含新状态
tree_set: 资源树集合
additional_add_params: 额外的参数
Returns:
操作结果字典
"""
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
# Update操作中包含改名需要先remove再add
if original_instance.name != plr_resource.name:
old_name = original_instance.name
new_name = plr_resource.name
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
# 收集所有相关的uuid包括子节点
_handle_remove([original_instance.unilabos_uuid])
original_instance.name = new_name
_handle_add([original_instance], tree_set, additional_add_params)
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
# 常规更新:不涉及改名
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
self.lab_logger().info(
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
f"目标父节点{target_parent_resource_uuid} 更新"
)
# 更新extra
if getattr(plr_resource, "unilabos_extra", None) is not None:
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
# 如果父节点变化,需要重新挂载
if (
original_parent_resource_uuid != target_parent_resource_uuid
and original_parent_resource is not None
):
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
# 加载状态
original_instance.load_all_state(states)
child_count = len(original_instance.get_all_children())
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count}"
)
# 调用driver的update回调
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
func(plr_resources)
return {"success": True, "action": "update"}
try:
data = json.loads(req.command)
results = []
@@ -656,7 +795,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
].call_async(
SerialCommand.Request(
command=json.dumps(
{"data": {"data": resources_uuid, "with_children": True if action == "add" else "update"}, "action": "get"}
{"data": {"data": resources_uuid, "with_children": True if action == "add" else False}, "action": "get"}
)
)
) # type: ignore
@@ -664,68 +803,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
tree_set = ResourceTreeSet.from_raw_list(raw_nodes)
try:
if action == "add":
# 添加资源到资源跟踪器
if tree_set is None:
raise ValueError("tree_set不能为None")
plr_resources = tree_set.to_plr_resources()
for plr_resource, tree in zip(plr_resources, tree_set.trees):
self.resource_tracker.add_resource(plr_resource)
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
func = getattr(self.driver_instance, "resource_tree_add", None)
if callable(func):
func(plr_resources)
results.append({"success": True, "action": "add"})
result = _handle_add(plr_resources, tree_set, additional_add_params)
results.append(result)
elif action == "update":
# 更新资源
if tree_set is None:
raise ValueError("tree_set不能为None")
plr_resources = tree_set.to_plr_resources()
for plr_resource, tree in zip(plr_resources, tree_set.trees):
states = plr_resource.serialize_all_state()
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
)
original_parent_resource = original_instance.parent
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
self.lab_logger().info(
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} 目标父节点{target_parent_resource_uuid} 更新"
)
# todo: 对extra进行update
if getattr(plr_resource, "unilabos_extra", None) is not None:
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra")
if original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None:
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
original_instance.load_all_state(states)
self.lab_logger().info(
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())}"
)
func = getattr(self.driver_instance, "resource_tree_update", None)
if callable(func):
func(plr_resources)
results.append({"success": True, "action": "update"})
result = _handle_update(plr_resources, tree_set, additional_add_params)
results.append(result)
elif action == "remove":
# 移除资源
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
[{"uuid": uid} for uid in resources_uuid], try_mode=True
)
found_plr_resources = []
other_plr_resources = []
for found_resource in found_resources:
for resource in found_resource:
if issubclass(resource.__class__, ResourcePLR):
found_plr_resources.append(resource)
else:
other_plr_resources.append(resource)
func = getattr(self.driver_instance, "resource_tree_remove", None)
if callable(func):
func(found_plr_resources)
for plr_resource in found_plr_resources:
if plr_resource.parent is not None:
plr_resource.parent.unassign_child_resource(plr_resource)
self.resource_tracker.remove_resource(plr_resource)
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
for other_plr_resource in other_plr_resources:
self.resource_tracker.remove_resource(other_plr_resource)
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
results.append({"success": True, "action": "remove"})
result = _handle_remove(resources_uuid)
results.append(result)
except Exception as e:
error_msg = f"Error processing {action} operation: {str(e)}"
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
@@ -734,7 +825,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 返回处理结果
result_json = {"results": results, "total": len(data)}
res.response = json.dumps(result_json, ensure_ascii=False)
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
except json.JSONDecodeError as e:
@@ -1004,9 +1095,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
# 通过资源跟踪器获取本地实例
final_resources = queried_resources if is_sequence else queried_resources[0]
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
]
final_resources = (
self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
if not is_sequence
else [
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False)
for res in queried_resources
]
)
action_kwargs[k] = final_resources
except Exception as e:
@@ -1227,6 +1323,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
raise JsonCommandInitError(
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
)
def _convert_resource_sync(self, resource_data: Dict[str, Any]):
"""同步转换资源数据为实例"""
# 创建资源查询请求

View File

@@ -165,29 +165,16 @@ class HostNode(BaseROS2DeviceNode):
# resources_config 的 root node 是
# # 创建反向映射new_uuid -> old_uuid
# reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
# for tree in resources_config.trees:
# node = tree.root_node
# if node.res_content.type == "device":
# if node.res_content.id == "host_node":
# continue
# # slave节点走c2s更新接口拿到add自行update uuid
# device_tracker = self.devices_instances[node.res_content.id].resource_tracker
# old_uuid = reverse_uuid_mapping.get(node.res_content.uuid)
# if old_uuid:
# # 找到旧UUID使用UUID查找
# resource_instance = device_tracker.uuid_to_resources.get(old_uuid)
# else:
# # 未找到旧UUID使用name查找
# resource_instance = device_tracker.figure_resource(
# {"name": node.res_content.name}
# )
# device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
# else:
# try:
# for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
# self.resource_tracker.add_resource(plr_resource)
# except Exception as ex:
# self.lab_logger().warning("[Host Node-Resource] 根节点物料序列化失败!")
for tree in resources_config.trees:
node = tree.root_node
if node.res_content.type == "device":
continue
else:
try:
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
except Exception as ex:
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
# 初始化Node基类传递空参数覆盖列表
@@ -878,11 +865,10 @@ class HostNode(BaseROS2DeviceNode):
success = False
uuid_mapping = {}
if len(self.bridges) > 0:
from unilabos.app.web.client import HTTPClient
from unilabos.app.web.client import HTTPClient, http_client
client: HTTPClient = self.bridges[-1]
resource_start_time = time.time()
uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
success = True
resource_end_time = time.time()
self.lab_logger().info(
@@ -990,9 +976,10 @@ class HostNode(BaseROS2DeviceNode):
"""
更新节点信息回调
"""
self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
try:
from unilabos.app.communication import get_communication_client
from unilabos.app.web.client import HTTPClient, http_client
info = json.loads(request.command)
if "SYNC_SLAVE_NODE_INFO" in info:
@@ -1001,10 +988,10 @@ class HostNode(BaseROS2DeviceNode):
edge_device_id = info["edge_device_id"]
self.device_machine_names[edge_device_id] = machine_name
else:
comm_client = get_communication_client()
registry_config = info["registry_config"]
for device_config in registry_config:
comm_client.publish_registry(device_config["id"], device_config)
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
@@ -1030,10 +1017,9 @@ class HostNode(BaseROS2DeviceNode):
success = False
if len(self.bridges) > 0: # 边的提交待定
from unilabos.app.web.client import HTTPClient
from unilabos.app.web.client import HTTPClient, http_client
client: HTTPClient = self.bridges[-1]
r = client.resource_add(add_schema(resources))
r = http_client.resource_add(add_schema(resources))
success = bool(r)
response.success = success

View File

@@ -2,7 +2,7 @@ import traceback
import uuid
from pydantic import BaseModel, field_serializer, field_validator
from pydantic import Field
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
from unilabos.utils.log import logger
@@ -900,7 +900,7 @@ class DeviceNodeResourceTracker(object):
new_uuid = name_to_uuid_map[resource_name]
self.set_resource_uuid(res, new_uuid)
self.uuid_to_resources[new_uuid] = res
logger.debug(f"设置资源UUID: {resource_name} -> {new_uuid}")
logger.trace(f"设置资源UUID: {resource_name} -> {new_uuid}")
return 1
return 0
@@ -923,7 +923,8 @@ class DeviceNodeResourceTracker(object):
if resource_name and resource_name in name_to_extra_map:
extra = name_to_extra_map[resource_name]
self.set_resource_extra(res, extra)
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
if len(extra):
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
return 1
return 0
@@ -933,7 +934,7 @@ class DeviceNodeResourceTracker(object):
"""
递归遍历资源树更新所有节点的uuid
Args:0
Args:
resource: 资源对象可以是dict或实例
uuid_map: uuid映射字典{old_uuid: new_uuid}
@@ -958,6 +959,27 @@ class DeviceNodeResourceTracker(object):
return self._traverse_and_process(resource, process)
def loop_gather_uuid(self, resource) -> List[str]:
"""
递归遍历资源树收集所有节点的uuid
Args:
resource: 资源对象可以是dict或实例
Returns:
收集到的uuid列表
"""
uuid_list = []
def process(res):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid:
uuid_list.append(current_uuid)
return 0
self._traverse_and_process(resource, process)
return uuid_list
def _collect_uuid_mapping(self, resource):
"""
递归收集资源的 uuid 映射到 uuid_to_resources
@@ -971,14 +993,15 @@ class DeviceNodeResourceTracker(object):
if current_uuid:
old = self.uuid_to_resources.get(current_uuid)
self.uuid_to_resources[current_uuid] = res
logger.debug(
logger.trace(
f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}"
)
return 1
return 0
self._traverse_and_process(resource, process)
def _remove_uuid_mapping(self, resource):
def _remove_uuid_mapping(self, resource) -> int:
"""
递归清除资源的 uuid 映射
@@ -990,10 +1013,11 @@ class DeviceNodeResourceTracker(object):
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
if current_uuid and current_uuid in self.uuid_to_resources:
self.uuid_to_resources.pop(current_uuid)
logger.debug(f"移除资源UUID映射: {current_uuid} -> {res}")
logger.trace(f"移除资源UUID映射: {current_uuid} -> {res}")
return 1
return 0
self._traverse_and_process(resource, process)
return self._traverse_and_process(resource, process)
def parent_resource(self, resource):
if id(resource) in self.resource2parent_resource:
@@ -1048,13 +1072,12 @@ class DeviceNodeResourceTracker(object):
removed = True
break
if not removed:
# 递归清除uuid映射
count = self._remove_uuid_mapping(resource)
if not count:
logger.warning(f"尝试移除不存在的资源: {resource}")
return False
# 递归清除uuid映射
self._remove_uuid_mapping(resource)
# 清除 resource2parent_resource 中与该资源相关的映射
# 需要清除1) 该资源作为 key 的映射 2) 该资源作为 value 的映射
keys_to_remove = []
@@ -1077,7 +1100,9 @@ class DeviceNodeResourceTracker(object):
self.uuid_to_resources.clear()
self.resource2parent_resource.clear()
def figure_resource(self, query_resource, try_mode=False):
def figure_resource(
self, query_resource: Union[List[Union[dict, "PLRResource"]], dict, "PLRResource"], try_mode=False
) -> Union[List[Union[dict, "PLRResource", List[Union[dict, "PLRResource"]]]], dict, "PLRResource"]:
if isinstance(query_resource, list):
return [self.figure_resource(r, try_mode) for r in query_resource]
elif (

View File

@@ -191,7 +191,8 @@ def configure_logger(loglevel=None):
# 添加处理器到根日志记录器
root_logger.addHandler(console_handler)
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
# 配置日志系统
configure_logger()