mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-15 13:44:39 +00:00
Compare commits
7 Commits
39bb7dc627
...
c35da65b15
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c35da65b15 | ||
|
|
659cf05be6 | ||
|
|
3b8deb4d1d | ||
|
|
c796615f9f | ||
|
|
a5bad6074f | ||
|
|
1d3a07a736 | ||
|
|
cc2cd57cdf |
@@ -13,7 +13,7 @@ def start_backend(
|
||||
graph=None,
|
||||
controllers_config: dict = {},
|
||||
bridges=[],
|
||||
without_host: bool = False,
|
||||
is_slave: bool = False,
|
||||
visual: str = "None",
|
||||
resources_mesh_config: dict = {},
|
||||
**kwargs,
|
||||
@@ -32,7 +32,7 @@ def start_backend(
|
||||
raise ValueError(f"Unsupported backend: {backend}")
|
||||
|
||||
backend_thread = threading.Thread(
|
||||
target=main if not without_host else slave,
|
||||
target=main if not is_slave else slave,
|
||||
args=(
|
||||
devices_config,
|
||||
resources_config,
|
||||
|
||||
@@ -375,22 +375,23 @@ def main():
|
||||
|
||||
args_dict["bridges"] = []
|
||||
|
||||
# 获取通信客户端(仅支持WebSocket)
|
||||
comm_client = get_communication_client()
|
||||
|
||||
if "websocket" in args_dict["app_bridges"]:
|
||||
args_dict["bridges"].append(comm_client)
|
||||
if "fastapi" in args_dict["app_bridges"]:
|
||||
args_dict["bridges"].append(http_client)
|
||||
if "websocket" in args_dict["app_bridges"]:
|
||||
# 获取通信客户端(仅支持WebSocket)
|
||||
if BasicConfig.is_host_mode:
|
||||
comm_client = get_communication_client()
|
||||
if "websocket" in args_dict["app_bridges"]:
|
||||
args_dict["bridges"].append(comm_client)
|
||||
def _exit(signum, frame):
|
||||
comm_client.stop()
|
||||
sys.exit(0)
|
||||
|
||||
def _exit(signum, frame):
|
||||
comm_client.stop()
|
||||
sys.exit(0)
|
||||
signal.signal(signal.SIGINT, _exit)
|
||||
signal.signal(signal.SIGTERM, _exit)
|
||||
comm_client.start()
|
||||
else:
|
||||
print_status("SlaveMode跳过Websocket连接")
|
||||
|
||||
signal.signal(signal.SIGINT, _exit)
|
||||
signal.signal(signal.SIGTERM, _exit)
|
||||
comm_client.start()
|
||||
args_dict["resources_mesh_config"] = {}
|
||||
args_dict["resources_edge_config"] = resource_edge_info
|
||||
# web visiualize 2D
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import json
|
||||
import time
|
||||
from typing import Optional, Tuple, Dict, Any
|
||||
|
||||
from unilabos.utils.log import logger
|
||||
from unilabos.utils.type_check import TypeEncoder
|
||||
|
||||
|
||||
def register_devices_and_resources(lab_registry):
|
||||
def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
|
||||
"""
|
||||
注册设备和资源到服务器(仅支持HTTP)
|
||||
"""
|
||||
@@ -28,6 +29,8 @@ def register_devices_and_resources(lab_registry):
|
||||
resources_to_register[resource_info["id"]] = resource_info
|
||||
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
|
||||
|
||||
if gather_only:
|
||||
return devices_to_register, resources_to_register
|
||||
# 注册设备
|
||||
if devices_to_register:
|
||||
try:
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
from __future__ import annotations
|
||||
import re
|
||||
import traceback
|
||||
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
|
||||
from collections import Counter
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
import pprint as pp
|
||||
import traceback
|
||||
from collections import Counter
|
||||
from typing import List, Sequence, Optional, Literal, Union, Iterator, Dict, Any, Callable, Set, cast
|
||||
|
||||
from pylabrobot.liquid_handling import LiquidHandler, LiquidHandlerBackend, LiquidHandlerChatterboxBackend, Strictness
|
||||
from pylabrobot.liquid_handling.liquid_handler import TipPresenceProbingMethod
|
||||
from pylabrobot.liquid_handling.standard import GripDirection
|
||||
|
||||
@@ -6,11 +6,12 @@ from typing import Optional, Dict, Any, List
|
||||
import rclpy
|
||||
from unilabos_msgs.srv._serial_command import SerialCommand_Response
|
||||
|
||||
from unilabos.app.register import register_devices_and_resources
|
||||
from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
|
||||
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
|
||||
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
|
||||
from unilabos_msgs.srv import SerialCommand # type: ignore
|
||||
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
|
||||
from rclpy.executors import MultiThreadedExecutor
|
||||
from rclpy.node import Node
|
||||
from rclpy.timer import Timer
|
||||
|
||||
@@ -108,66 +109,51 @@ def slave(
|
||||
rclpy_init_args: List[str] = ["--log-level", "debug"],
|
||||
) -> None:
|
||||
"""从节点函数"""
|
||||
# 1. 初始化 ROS2
|
||||
if not rclpy.ok():
|
||||
rclpy.init(args=rclpy_init_args)
|
||||
executor = rclpy.__executor
|
||||
if not executor:
|
||||
executor = rclpy.__executor = MultiThreadedExecutor()
|
||||
devices_instances = {}
|
||||
for device_config in devices_config.root_nodes:
|
||||
device_id = device_config.res_content.id
|
||||
if device_config.res_content.type != "device":
|
||||
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
|
||||
devices_instances[device_id] = d
|
||||
# 默认初始化
|
||||
# if d is not None and isinstance(d, Node):
|
||||
# executor.add_node(d)
|
||||
# else:
|
||||
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
|
||||
|
||||
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
|
||||
executor.add_node(n)
|
||||
|
||||
if visual != "disable":
|
||||
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
|
||||
|
||||
resource_mesh_manager = ResourceMeshManager(
|
||||
resources_mesh_config,
|
||||
resources_config, # type: ignore FIXME
|
||||
resource_tracker=DeviceNodeResourceTracker(),
|
||||
device_id="resource_mesh_manager",
|
||||
)
|
||||
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
|
||||
|
||||
executor.add_node(resource_mesh_manager)
|
||||
executor.add_node(joint_republisher)
|
||||
# 1.5 启动 executor 线程
|
||||
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
|
||||
thread.start()
|
||||
|
||||
# 2. 创建 Slave Machine Node
|
||||
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
|
||||
executor.add_node(n)
|
||||
|
||||
# 3. 向 Host 报送节点信息和物料,获取 UUID 映射
|
||||
uuid_mapping = {}
|
||||
if not BasicConfig.slave_no_host:
|
||||
# 3.1 报送节点信息
|
||||
sclient = n.create_client(SerialCommand, "/node_info_update")
|
||||
sclient.wait_for_service()
|
||||
|
||||
registry_config = {}
|
||||
devices_to_register, resources_to_register = register_devices_and_resources(lab_registry, True)
|
||||
registry_config.update(devices_to_register)
|
||||
registry_config.update(resources_to_register)
|
||||
request = SerialCommand.Request()
|
||||
request.command = json.dumps(
|
||||
{
|
||||
"machine_name": BasicConfig.machine_name,
|
||||
"type": "slave",
|
||||
"devices_config": devices_config.dump(),
|
||||
"registry_config": lab_registry.obtain_registry_device_info(),
|
||||
"registry_config": registry_config,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
cls=TypeEncoder,
|
||||
)
|
||||
response = sclient.call_async(request).result()
|
||||
sclient.call_async(request).result()
|
||||
logger.info(f"Slave node info updated.")
|
||||
|
||||
# 使用新的 c2s_update_resource_tree 服务
|
||||
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
|
||||
rclient.wait_for_service()
|
||||
|
||||
# 序列化 ResourceTreeSet 为 JSON
|
||||
# 3.2 报送物料树,获取 UUID 映射
|
||||
if resources_config:
|
||||
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
|
||||
rclient.wait_for_service()
|
||||
|
||||
request = SerialCommand.Request()
|
||||
request.command = json.dumps(
|
||||
{
|
||||
@@ -180,35 +166,61 @@ def slave(
|
||||
},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
tree_response: SerialCommand_Response = rclient.call_async(request).result()
|
||||
tree_response: SerialCommand_Response = rclient.call(request)
|
||||
uuid_mapping = json.loads(tree_response.response)
|
||||
# 创建反向映射:new_uuid -> old_uuid
|
||||
reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
|
||||
for node in resources_config.root_nodes:
|
||||
if node.res_content.type == "device":
|
||||
for sub_node in node.children:
|
||||
# 只有二级子设备
|
||||
if sub_node.res_content.type != "device":
|
||||
device_tracker = devices_instances[node.res_content.id].resource_tracker
|
||||
# sub_node.res_content.uuid 已经是新UUID,需要用旧UUID去查找
|
||||
old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid)
|
||||
if old_uuid:
|
||||
# 找到旧UUID,使用UUID查找
|
||||
resource_instance = device_tracker.figure_resource({"uuid": old_uuid})
|
||||
else:
|
||||
# 未找到旧UUID,使用name查找
|
||||
resource_instance = device_tracker.figure_resource({"name": sub_node.res_content.name})
|
||||
device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
|
||||
logger.info(f"Slave resource tree added. UUID mapping: {len(uuid_mapping)} nodes")
|
||||
|
||||
# 3.3 使用 UUID 映射更新 resources_config 的 UUID(参考 client.py 逻辑)
|
||||
old_uuids = {node.res_content.uuid: node for node in resources_config.all_nodes}
|
||||
for old_uuid, node in old_uuids.items():
|
||||
if old_uuid in uuid_mapping:
|
||||
new_uuid = uuid_mapping[old_uuid]
|
||||
node.res_content.uuid = new_uuid
|
||||
# 更新所有子节点的 parent_uuid
|
||||
for child in node.children:
|
||||
child.res_content.parent_uuid = new_uuid
|
||||
else:
|
||||
logger.error("Slave模式不允许新增非设备节点下的物料")
|
||||
continue
|
||||
if tree_response:
|
||||
logger.info(f"Slave resource tree added. Response: {tree_response.response}")
|
||||
else:
|
||||
logger.warning("Slave resource tree add response is None")
|
||||
logger.warning(f"资源UUID未更新: {old_uuid}")
|
||||
else:
|
||||
logger.info("No resources to add.")
|
||||
|
||||
# 4. 初始化所有设备实例(此时 resources_config 的 UUID 已更新)
|
||||
devices_instances = {}
|
||||
for device_config in devices_config.root_nodes:
|
||||
device_id = device_config.res_content.id
|
||||
if device_config.res_content.type == "device":
|
||||
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
|
||||
if d is not None:
|
||||
devices_instances[device_id] = d
|
||||
logger.info(f"Device {device_id} initialized.")
|
||||
else:
|
||||
logger.warning(f"Device {device_id} initialization failed.")
|
||||
|
||||
# 5. 如果启用可视化,创建可视化相关节点
|
||||
if visual != "disable":
|
||||
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
|
||||
|
||||
# 将 ResourceTreeSet 转换为 list 用于 visual 组件
|
||||
resources_list = (
|
||||
[node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
|
||||
if resources_config
|
||||
else []
|
||||
)
|
||||
resource_mesh_manager = ResourceMeshManager(
|
||||
resources_mesh_config,
|
||||
resources_list,
|
||||
resource_tracker=DeviceNodeResourceTracker(),
|
||||
device_id="resource_mesh_manager",
|
||||
)
|
||||
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
|
||||
lh_joint_pub = LiquidHandlerJointPublisher(
|
||||
resources_config=resources_list, resource_tracker=DeviceNodeResourceTracker()
|
||||
)
|
||||
executor.add_node(resource_mesh_manager)
|
||||
executor.add_node(joint_republisher)
|
||||
executor.add_node(lh_joint_pub)
|
||||
|
||||
# 7. 保持运行
|
||||
while True:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
@@ -638,6 +638,145 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
- remove: 从资源树中移除资源
|
||||
"""
|
||||
from pylabrobot.resources.resource import Resource as ResourcePLR
|
||||
|
||||
def _handle_add(
|
||||
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
处理资源添加操作的内部函数
|
||||
|
||||
Args:
|
||||
plr_resources: PLR资源列表
|
||||
tree_set: 资源树集合
|
||||
additional_add_params: 额外的添加参数
|
||||
|
||||
Returns:
|
||||
操作结果字典
|
||||
"""
|
||||
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||
self.resource_tracker.add_resource(plr_resource)
|
||||
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
|
||||
|
||||
func = getattr(self.driver_instance, "resource_tree_add", None)
|
||||
if callable(func):
|
||||
func(plr_resources)
|
||||
|
||||
return {"success": True, "action": "add"}
|
||||
|
||||
def _handle_remove(resources_uuid: List[str]) -> Dict[str, Any]:
|
||||
"""
|
||||
处理资源移除操作的内部函数
|
||||
|
||||
Args:
|
||||
resources_uuid: 要移除的资源UUID列表
|
||||
|
||||
Returns:
|
||||
操作结果字典,包含移除的资源列表
|
||||
"""
|
||||
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
|
||||
[{"uuid": uid} for uid in resources_uuid], try_mode=True
|
||||
)
|
||||
found_plr_resources = []
|
||||
other_plr_resources = []
|
||||
|
||||
for found_resource in found_resources:
|
||||
for resource in found_resource:
|
||||
if issubclass(resource.__class__, ResourcePLR):
|
||||
found_plr_resources.append(resource)
|
||||
else:
|
||||
other_plr_resources.append(resource)
|
||||
|
||||
# 调用driver的remove回调
|
||||
func = getattr(self.driver_instance, "resource_tree_remove", None)
|
||||
if callable(func):
|
||||
func(found_plr_resources)
|
||||
|
||||
# 从parent卸载并从tracker移除
|
||||
for plr_resource in found_plr_resources:
|
||||
if plr_resource.parent is not None:
|
||||
plr_resource.parent.unassign_child_resource(plr_resource)
|
||||
self.resource_tracker.remove_resource(plr_resource)
|
||||
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
|
||||
|
||||
for other_plr_resource in other_plr_resources:
|
||||
self.resource_tracker.remove_resource(other_plr_resource)
|
||||
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"action": "remove",
|
||||
# "removed_plr": found_plr_resources,
|
||||
# "removed_other": other_plr_resources,
|
||||
}
|
||||
|
||||
def _handle_update(
|
||||
plr_resources: List[ResourcePLR], tree_set: ResourceTreeSet, additional_add_params: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
处理资源更新操作的内部函数
|
||||
|
||||
Args:
|
||||
plr_resources: PLR资源列表(包含新状态)
|
||||
tree_set: 资源树集合
|
||||
additional_add_params: 额外的参数
|
||||
|
||||
Returns:
|
||||
操作结果字典
|
||||
"""
|
||||
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||
states = plr_resource.serialize_all_state()
|
||||
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
|
||||
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
|
||||
)
|
||||
|
||||
# Update操作中包含改名:需要先remove再add
|
||||
if original_instance.name != plr_resource.name:
|
||||
old_name = original_instance.name
|
||||
new_name = plr_resource.name
|
||||
self.lab_logger().info(f"物料改名操作:{old_name} -> {new_name}")
|
||||
|
||||
# 收集所有相关的uuid(包括子节点)
|
||||
_handle_remove([original_instance.unilabos_uuid])
|
||||
original_instance.name = new_name
|
||||
_handle_add([original_instance], tree_set, additional_add_params)
|
||||
|
||||
self.lab_logger().info(f"物料改名完成:{old_name} -> {new_name}")
|
||||
|
||||
# 常规更新:不涉及改名
|
||||
original_parent_resource = original_instance.parent
|
||||
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
||||
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
||||
|
||||
self.lab_logger().info(
|
||||
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} "
|
||||
f"目标父节点{target_parent_resource_uuid} 更新"
|
||||
)
|
||||
|
||||
# 更新extra
|
||||
if getattr(plr_resource, "unilabos_extra", None) is not None:
|
||||
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra") # type: ignore # noqa: E501
|
||||
|
||||
# 如果父节点变化,需要重新挂载
|
||||
if (
|
||||
original_parent_resource_uuid != target_parent_resource_uuid
|
||||
and original_parent_resource is not None
|
||||
):
|
||||
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||
|
||||
# 加载状态
|
||||
original_instance.load_all_state(states)
|
||||
child_count = len(original_instance.get_all_children())
|
||||
self.lab_logger().info(
|
||||
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] " f"及其子节点 {child_count} 个"
|
||||
)
|
||||
|
||||
# 调用driver的update回调
|
||||
func = getattr(self.driver_instance, "resource_tree_update", None)
|
||||
if callable(func):
|
||||
func(plr_resources)
|
||||
|
||||
return {"success": True, "action": "update"}
|
||||
|
||||
try:
|
||||
data = json.loads(req.command)
|
||||
results = []
|
||||
@@ -656,7 +795,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
].call_async(
|
||||
SerialCommand.Request(
|
||||
command=json.dumps(
|
||||
{"data": {"data": resources_uuid, "with_children": True if action == "add" else "update"}, "action": "get"}
|
||||
{"data": {"data": resources_uuid, "with_children": True if action == "add" else False}, "action": "get"}
|
||||
)
|
||||
)
|
||||
) # type: ignore
|
||||
@@ -664,68 +803,20 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
tree_set = ResourceTreeSet.from_raw_list(raw_nodes)
|
||||
try:
|
||||
if action == "add":
|
||||
# 添加资源到资源跟踪器
|
||||
if tree_set is None:
|
||||
raise ValueError("tree_set不能为None")
|
||||
plr_resources = tree_set.to_plr_resources()
|
||||
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||
self.resource_tracker.add_resource(plr_resource)
|
||||
self.transfer_to_new_resource(plr_resource, tree, additional_add_params)
|
||||
func = getattr(self.driver_instance, "resource_tree_add", None)
|
||||
if callable(func):
|
||||
func(plr_resources)
|
||||
results.append({"success": True, "action": "add"})
|
||||
result = _handle_add(plr_resources, tree_set, additional_add_params)
|
||||
results.append(result)
|
||||
elif action == "update":
|
||||
# 更新资源
|
||||
if tree_set is None:
|
||||
raise ValueError("tree_set不能为None")
|
||||
plr_resources = tree_set.to_plr_resources()
|
||||
for plr_resource, tree in zip(plr_resources, tree_set.trees):
|
||||
states = plr_resource.serialize_all_state()
|
||||
original_instance: ResourcePLR = self.resource_tracker.figure_resource(
|
||||
{"uuid": tree.root_node.res_content.uuid}, try_mode=False
|
||||
)
|
||||
original_parent_resource = original_instance.parent
|
||||
original_parent_resource_uuid = getattr(original_parent_resource, "unilabos_uuid", None)
|
||||
target_parent_resource_uuid = tree.root_node.res_content.uuid_parent
|
||||
self.lab_logger().info(
|
||||
f"物料{original_instance} 原始父节点{original_parent_resource_uuid} 目标父节点{target_parent_resource_uuid} 更新"
|
||||
)
|
||||
# todo: 对extra进行update
|
||||
if getattr(plr_resource, "unilabos_extra", None) is not None:
|
||||
original_instance.unilabos_extra = getattr(plr_resource, "unilabos_extra")
|
||||
if original_parent_resource_uuid != target_parent_resource_uuid and original_parent_resource is not None:
|
||||
self.transfer_to_new_resource(original_instance, tree, additional_add_params)
|
||||
original_instance.load_all_state(states)
|
||||
self.lab_logger().info(
|
||||
f"更新了资源属性 {plr_resource}[{tree.root_node.res_content.uuid}] 及其子节点 {len(original_instance.get_all_children())} 个"
|
||||
)
|
||||
|
||||
func = getattr(self.driver_instance, "resource_tree_update", None)
|
||||
if callable(func):
|
||||
func(plr_resources)
|
||||
results.append({"success": True, "action": "update"})
|
||||
result = _handle_update(plr_resources, tree_set, additional_add_params)
|
||||
results.append(result)
|
||||
elif action == "remove":
|
||||
# 移除资源
|
||||
found_resources: List[List[Union[ResourcePLR, dict]]] = self.resource_tracker.figure_resource(
|
||||
[{"uuid": uid} for uid in resources_uuid], try_mode=True
|
||||
)
|
||||
found_plr_resources = []
|
||||
other_plr_resources = []
|
||||
for found_resource in found_resources:
|
||||
for resource in found_resource:
|
||||
if issubclass(resource.__class__, ResourcePLR):
|
||||
found_plr_resources.append(resource)
|
||||
else:
|
||||
other_plr_resources.append(resource)
|
||||
func = getattr(self.driver_instance, "resource_tree_remove", None)
|
||||
if callable(func):
|
||||
func(found_plr_resources)
|
||||
for plr_resource in found_plr_resources:
|
||||
if plr_resource.parent is not None:
|
||||
plr_resource.parent.unassign_child_resource(plr_resource)
|
||||
self.resource_tracker.remove_resource(plr_resource)
|
||||
self.lab_logger().info(f"移除物料 {plr_resource} 及其子节点")
|
||||
for other_plr_resource in other_plr_resources:
|
||||
self.resource_tracker.remove_resource(other_plr_resource)
|
||||
self.lab_logger().info(f"移除物料 {other_plr_resource} 及其子节点")
|
||||
results.append({"success": True, "action": "remove"})
|
||||
result = _handle_remove(resources_uuid)
|
||||
results.append(result)
|
||||
except Exception as e:
|
||||
error_msg = f"Error processing {action} operation: {str(e)}"
|
||||
self.lab_logger().error(f"[Resource Tree Update] {error_msg}")
|
||||
@@ -734,7 +825,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
# 返回处理结果
|
||||
result_json = {"results": results, "total": len(data)}
|
||||
res.response = json.dumps(result_json, ensure_ascii=False)
|
||||
res.response = json.dumps(result_json, ensure_ascii=False, cls=TypeEncoder)
|
||||
self.lab_logger().info(f"[Resource Tree Update] Completed processing {len(data)} operations")
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
@@ -1004,9 +1095,14 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
|
||||
# 通过资源跟踪器获取本地实例
|
||||
final_resources = queried_resources if is_sequence else queried_resources[0]
|
||||
final_resources = self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False) if not is_sequence else [
|
||||
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False) for res in queried_resources
|
||||
]
|
||||
final_resources = (
|
||||
self.resource_tracker.figure_resource({"name": final_resources.name}, try_mode=False)
|
||||
if not is_sequence
|
||||
else [
|
||||
self.resource_tracker.figure_resource({"name": res.name}, try_mode=False)
|
||||
for res in queried_resources
|
||||
]
|
||||
)
|
||||
action_kwargs[k] = final_resources
|
||||
|
||||
except Exception as e:
|
||||
@@ -1227,6 +1323,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
|
||||
raise JsonCommandInitError(
|
||||
f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}"
|
||||
)
|
||||
|
||||
def _convert_resource_sync(self, resource_data: Dict[str, Any]):
|
||||
"""同步转换资源数据为实例"""
|
||||
# 创建资源查询请求
|
||||
|
||||
@@ -165,29 +165,16 @@ class HostNode(BaseROS2DeviceNode):
|
||||
# resources_config 的 root node 是
|
||||
# # 创建反向映射:new_uuid -> old_uuid
|
||||
# reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
|
||||
# for tree in resources_config.trees:
|
||||
# node = tree.root_node
|
||||
# if node.res_content.type == "device":
|
||||
# if node.res_content.id == "host_node":
|
||||
# continue
|
||||
# # slave节点走c2s更新接口,拿到add自行update uuid
|
||||
# device_tracker = self.devices_instances[node.res_content.id].resource_tracker
|
||||
# old_uuid = reverse_uuid_mapping.get(node.res_content.uuid)
|
||||
# if old_uuid:
|
||||
# # 找到旧UUID,使用UUID查找
|
||||
# resource_instance = device_tracker.uuid_to_resources.get(old_uuid)
|
||||
# else:
|
||||
# # 未找到旧UUID,使用name查找
|
||||
# resource_instance = device_tracker.figure_resource(
|
||||
# {"name": node.res_content.name}
|
||||
# )
|
||||
# device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
|
||||
# else:
|
||||
# try:
|
||||
# for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
|
||||
# self.resource_tracker.add_resource(plr_resource)
|
||||
# except Exception as ex:
|
||||
# self.lab_logger().warning("[Host Node-Resource] 根节点物料序列化失败!")
|
||||
for tree in resources_config.trees:
|
||||
node = tree.root_node
|
||||
if node.res_content.type == "device":
|
||||
continue
|
||||
else:
|
||||
try:
|
||||
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
|
||||
self._resource_tracker.add_resource(plr_resource)
|
||||
except Exception as ex:
|
||||
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
|
||||
except Exception as ex:
|
||||
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
|
||||
# 初始化Node基类,传递空参数覆盖列表
|
||||
@@ -878,11 +865,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
success = False
|
||||
uuid_mapping = {}
|
||||
if len(self.bridges) > 0:
|
||||
from unilabos.app.web.client import HTTPClient
|
||||
from unilabos.app.web.client import HTTPClient, http_client
|
||||
|
||||
client: HTTPClient = self.bridges[-1]
|
||||
resource_start_time = time.time()
|
||||
uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
|
||||
uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
|
||||
success = True
|
||||
resource_end_time = time.time()
|
||||
self.lab_logger().info(
|
||||
@@ -990,9 +976,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
"""
|
||||
更新节点信息回调
|
||||
"""
|
||||
self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
|
||||
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
|
||||
try:
|
||||
from unilabos.app.communication import get_communication_client
|
||||
from unilabos.app.web.client import HTTPClient, http_client
|
||||
|
||||
info = json.loads(request.command)
|
||||
if "SYNC_SLAVE_NODE_INFO" in info:
|
||||
@@ -1001,10 +988,10 @@ class HostNode(BaseROS2DeviceNode):
|
||||
edge_device_id = info["edge_device_id"]
|
||||
self.device_machine_names[edge_device_id] = machine_name
|
||||
else:
|
||||
comm_client = get_communication_client()
|
||||
registry_config = info["registry_config"]
|
||||
for device_config in registry_config:
|
||||
comm_client.publish_registry(device_config["id"], device_config)
|
||||
devices_config = info.pop("devices_config")
|
||||
registry_config = info.pop("registry_config")
|
||||
if registry_config:
|
||||
http_client.resource_registry({"resources": registry_config})
|
||||
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
|
||||
response.response = "OK"
|
||||
except Exception as e:
|
||||
@@ -1030,10 +1017,9 @@ class HostNode(BaseROS2DeviceNode):
|
||||
|
||||
success = False
|
||||
if len(self.bridges) > 0: # 边的提交待定
|
||||
from unilabos.app.web.client import HTTPClient
|
||||
from unilabos.app.web.client import HTTPClient, http_client
|
||||
|
||||
client: HTTPClient = self.bridges[-1]
|
||||
r = client.resource_add(add_schema(resources))
|
||||
r = http_client.resource_add(add_schema(resources))
|
||||
success = bool(r)
|
||||
|
||||
response.success = success
|
||||
|
||||
@@ -2,7 +2,7 @@ import traceback
|
||||
import uuid
|
||||
from pydantic import BaseModel, field_serializer, field_validator
|
||||
from pydantic import Field
|
||||
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING
|
||||
from typing import List, Tuple, Any, Dict, Literal, Optional, cast, TYPE_CHECKING, Union
|
||||
|
||||
from unilabos.utils.log import logger
|
||||
|
||||
@@ -900,7 +900,7 @@ class DeviceNodeResourceTracker(object):
|
||||
new_uuid = name_to_uuid_map[resource_name]
|
||||
self.set_resource_uuid(res, new_uuid)
|
||||
self.uuid_to_resources[new_uuid] = res
|
||||
logger.debug(f"设置资源UUID: {resource_name} -> {new_uuid}")
|
||||
logger.trace(f"设置资源UUID: {resource_name} -> {new_uuid}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
@@ -923,7 +923,8 @@ class DeviceNodeResourceTracker(object):
|
||||
if resource_name and resource_name in name_to_extra_map:
|
||||
extra = name_to_extra_map[resource_name]
|
||||
self.set_resource_extra(res, extra)
|
||||
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
|
||||
if len(extra):
|
||||
logger.debug(f"设置资源Extra: {resource_name} -> {extra}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
@@ -933,7 +934,7 @@ class DeviceNodeResourceTracker(object):
|
||||
"""
|
||||
递归遍历资源树,更新所有节点的uuid
|
||||
|
||||
Args:0
|
||||
Args:
|
||||
resource: 资源对象(可以是dict或实例)
|
||||
uuid_map: uuid映射字典,{old_uuid: new_uuid}
|
||||
|
||||
@@ -958,6 +959,27 @@ class DeviceNodeResourceTracker(object):
|
||||
|
||||
return self._traverse_and_process(resource, process)
|
||||
|
||||
def loop_gather_uuid(self, resource) -> List[str]:
|
||||
"""
|
||||
递归遍历资源树,收集所有节点的uuid
|
||||
|
||||
Args:
|
||||
resource: 资源对象(可以是dict或实例)
|
||||
|
||||
Returns:
|
||||
收集到的uuid列表
|
||||
"""
|
||||
uuid_list = []
|
||||
|
||||
def process(res):
|
||||
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
|
||||
if current_uuid:
|
||||
uuid_list.append(current_uuid)
|
||||
return 0
|
||||
|
||||
self._traverse_and_process(resource, process)
|
||||
return uuid_list
|
||||
|
||||
def _collect_uuid_mapping(self, resource):
|
||||
"""
|
||||
递归收集资源的 uuid 映射到 uuid_to_resources
|
||||
@@ -971,14 +993,15 @@ class DeviceNodeResourceTracker(object):
|
||||
if current_uuid:
|
||||
old = self.uuid_to_resources.get(current_uuid)
|
||||
self.uuid_to_resources[current_uuid] = res
|
||||
logger.debug(
|
||||
logger.trace(
|
||||
f"收集资源UUID映射: {current_uuid} -> {res} {'' if old is None else f'(覆盖旧值: {old})'}"
|
||||
)
|
||||
return 1
|
||||
return 0
|
||||
|
||||
self._traverse_and_process(resource, process)
|
||||
|
||||
def _remove_uuid_mapping(self, resource):
|
||||
def _remove_uuid_mapping(self, resource) -> int:
|
||||
"""
|
||||
递归清除资源的 uuid 映射
|
||||
|
||||
@@ -990,10 +1013,11 @@ class DeviceNodeResourceTracker(object):
|
||||
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
|
||||
if current_uuid and current_uuid in self.uuid_to_resources:
|
||||
self.uuid_to_resources.pop(current_uuid)
|
||||
logger.debug(f"移除资源UUID映射: {current_uuid} -> {res}")
|
||||
logger.trace(f"移除资源UUID映射: {current_uuid} -> {res}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
self._traverse_and_process(resource, process)
|
||||
return self._traverse_and_process(resource, process)
|
||||
|
||||
def parent_resource(self, resource):
|
||||
if id(resource) in self.resource2parent_resource:
|
||||
@@ -1048,13 +1072,12 @@ class DeviceNodeResourceTracker(object):
|
||||
removed = True
|
||||
break
|
||||
|
||||
if not removed:
|
||||
# 递归清除uuid映射
|
||||
count = self._remove_uuid_mapping(resource)
|
||||
if not count:
|
||||
logger.warning(f"尝试移除不存在的资源: {resource}")
|
||||
return False
|
||||
|
||||
# 递归清除uuid映射
|
||||
self._remove_uuid_mapping(resource)
|
||||
|
||||
# 清除 resource2parent_resource 中与该资源相关的映射
|
||||
# 需要清除:1) 该资源作为 key 的映射 2) 该资源作为 value 的映射
|
||||
keys_to_remove = []
|
||||
@@ -1077,7 +1100,9 @@ class DeviceNodeResourceTracker(object):
|
||||
self.uuid_to_resources.clear()
|
||||
self.resource2parent_resource.clear()
|
||||
|
||||
def figure_resource(self, query_resource, try_mode=False):
|
||||
def figure_resource(
|
||||
self, query_resource: Union[List[Union[dict, "PLRResource"]], dict, "PLRResource"], try_mode=False
|
||||
) -> Union[List[Union[dict, "PLRResource", List[Union[dict, "PLRResource"]]]], dict, "PLRResource"]:
|
||||
if isinstance(query_resource, list):
|
||||
return [self.figure_resource(r, try_mode) for r in query_resource]
|
||||
elif (
|
||||
|
||||
@@ -191,7 +191,8 @@ def configure_logger(loglevel=None):
|
||||
|
||||
# 添加处理器到根日志记录器
|
||||
root_logger.addHandler(console_handler)
|
||||
|
||||
logging.getLogger("asyncio").setLevel(logging.INFO)
|
||||
logging.getLogger("urllib3").setLevel(logging.INFO)
|
||||
|
||||
# 配置日志系统
|
||||
configure_logger()
|
||||
|
||||
Reference in New Issue
Block a user