mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 13:01:12 +00:00
enable slave mode
This commit is contained in:
@@ -13,7 +13,7 @@ def start_backend(
|
|||||||
graph=None,
|
graph=None,
|
||||||
controllers_config: dict = {},
|
controllers_config: dict = {},
|
||||||
bridges=[],
|
bridges=[],
|
||||||
without_host: bool = False,
|
is_slave: bool = False,
|
||||||
visual: str = "None",
|
visual: str = "None",
|
||||||
resources_mesh_config: dict = {},
|
resources_mesh_config: dict = {},
|
||||||
**kwargs,
|
**kwargs,
|
||||||
@@ -32,7 +32,7 @@ def start_backend(
|
|||||||
raise ValueError(f"Unsupported backend: {backend}")
|
raise ValueError(f"Unsupported backend: {backend}")
|
||||||
|
|
||||||
backend_thread = threading.Thread(
|
backend_thread = threading.Thread(
|
||||||
target=main if not without_host else slave,
|
target=main if not is_slave else slave,
|
||||||
args=(
|
args=(
|
||||||
devices_config,
|
devices_config,
|
||||||
resources_config,
|
resources_config,
|
||||||
|
|||||||
@@ -1,11 +1,12 @@
|
|||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
from typing import Optional, Tuple, Dict, Any
|
||||||
|
|
||||||
from unilabos.utils.log import logger
|
from unilabos.utils.log import logger
|
||||||
from unilabos.utils.type_check import TypeEncoder
|
from unilabos.utils.type_check import TypeEncoder
|
||||||
|
|
||||||
|
|
||||||
def register_devices_and_resources(lab_registry):
|
def register_devices_and_resources(lab_registry, gather_only=False) -> Optional[Tuple[Dict[str, Any], Dict[str, Any]]]:
|
||||||
"""
|
"""
|
||||||
注册设备和资源到服务器(仅支持HTTP)
|
注册设备和资源到服务器(仅支持HTTP)
|
||||||
"""
|
"""
|
||||||
@@ -28,6 +29,8 @@ def register_devices_and_resources(lab_registry):
|
|||||||
resources_to_register[resource_info["id"]] = resource_info
|
resources_to_register[resource_info["id"]] = resource_info
|
||||||
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
|
logger.debug(f"[UniLab Register] 收集资源: {resource_info['id']}")
|
||||||
|
|
||||||
|
if gather_only:
|
||||||
|
return devices_to_register, resources_to_register
|
||||||
# 注册设备
|
# 注册设备
|
||||||
if devices_to_register:
|
if devices_to_register:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -6,11 +6,12 @@ from typing import Optional, Dict, Any, List
|
|||||||
import rclpy
|
import rclpy
|
||||||
from unilabos_msgs.srv._serial_command import SerialCommand_Response
|
from unilabos_msgs.srv._serial_command import SerialCommand_Response
|
||||||
|
|
||||||
|
from unilabos.app.register import register_devices_and_resources
|
||||||
from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
|
from unilabos.ros.nodes.presets.resource_mesh_manager import ResourceMeshManager
|
||||||
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
|
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker, ResourceTreeSet
|
||||||
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
|
from unilabos.devices.ros_dev.liquid_handler_joint_publisher import LiquidHandlerJointPublisher
|
||||||
from unilabos_msgs.srv import SerialCommand # type: ignore
|
from unilabos_msgs.srv import SerialCommand # type: ignore
|
||||||
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
|
from rclpy.executors import MultiThreadedExecutor
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
from rclpy.timer import Timer
|
from rclpy.timer import Timer
|
||||||
|
|
||||||
@@ -108,66 +109,51 @@ def slave(
|
|||||||
rclpy_init_args: List[str] = ["--log-level", "debug"],
|
rclpy_init_args: List[str] = ["--log-level", "debug"],
|
||||||
) -> None:
|
) -> None:
|
||||||
"""从节点函数"""
|
"""从节点函数"""
|
||||||
|
# 1. 初始化 ROS2
|
||||||
if not rclpy.ok():
|
if not rclpy.ok():
|
||||||
rclpy.init(args=rclpy_init_args)
|
rclpy.init(args=rclpy_init_args)
|
||||||
executor = rclpy.__executor
|
executor = rclpy.__executor
|
||||||
if not executor:
|
if not executor:
|
||||||
executor = rclpy.__executor = MultiThreadedExecutor()
|
executor = rclpy.__executor = MultiThreadedExecutor()
|
||||||
devices_instances = {}
|
|
||||||
for device_config in devices_config.root_nodes:
|
|
||||||
device_id = device_config.res_content.id
|
|
||||||
if device_config.res_content.type != "device":
|
|
||||||
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
|
|
||||||
devices_instances[device_id] = d
|
|
||||||
# 默认初始化
|
|
||||||
# if d is not None and isinstance(d, Node):
|
|
||||||
# executor.add_node(d)
|
|
||||||
# else:
|
|
||||||
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
|
|
||||||
|
|
||||||
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
|
# 1.5 启动 executor 线程
|
||||||
executor.add_node(n)
|
|
||||||
|
|
||||||
if visual != "disable":
|
|
||||||
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
|
|
||||||
|
|
||||||
resource_mesh_manager = ResourceMeshManager(
|
|
||||||
resources_mesh_config,
|
|
||||||
resources_config, # type: ignore FIXME
|
|
||||||
resource_tracker=DeviceNodeResourceTracker(),
|
|
||||||
device_id="resource_mesh_manager",
|
|
||||||
)
|
|
||||||
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
|
|
||||||
|
|
||||||
executor.add_node(resource_mesh_manager)
|
|
||||||
executor.add_node(joint_republisher)
|
|
||||||
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
|
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
|
# 2. 创建 Slave Machine Node
|
||||||
|
n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
|
||||||
|
executor.add_node(n)
|
||||||
|
|
||||||
|
# 3. 向 Host 报送节点信息和物料,获取 UUID 映射
|
||||||
|
uuid_mapping = {}
|
||||||
if not BasicConfig.slave_no_host:
|
if not BasicConfig.slave_no_host:
|
||||||
|
# 3.1 报送节点信息
|
||||||
sclient = n.create_client(SerialCommand, "/node_info_update")
|
sclient = n.create_client(SerialCommand, "/node_info_update")
|
||||||
sclient.wait_for_service()
|
sclient.wait_for_service()
|
||||||
|
|
||||||
|
registry_config = {}
|
||||||
|
devices_to_register, resources_to_register = register_devices_and_resources(lab_registry, True)
|
||||||
|
registry_config.update(devices_to_register)
|
||||||
|
registry_config.update(resources_to_register)
|
||||||
request = SerialCommand.Request()
|
request = SerialCommand.Request()
|
||||||
request.command = json.dumps(
|
request.command = json.dumps(
|
||||||
{
|
{
|
||||||
"machine_name": BasicConfig.machine_name,
|
"machine_name": BasicConfig.machine_name,
|
||||||
"type": "slave",
|
"type": "slave",
|
||||||
"devices_config": devices_config.dump(),
|
"devices_config": devices_config.dump(),
|
||||||
"registry_config": lab_registry.obtain_registry_device_info(),
|
"registry_config": registry_config,
|
||||||
},
|
},
|
||||||
ensure_ascii=False,
|
ensure_ascii=False,
|
||||||
cls=TypeEncoder,
|
cls=TypeEncoder,
|
||||||
)
|
)
|
||||||
response = sclient.call_async(request).result()
|
sclient.call_async(request).result()
|
||||||
logger.info(f"Slave node info updated.")
|
logger.info(f"Slave node info updated.")
|
||||||
|
|
||||||
# 使用新的 c2s_update_resource_tree 服务
|
# 3.2 报送物料树,获取 UUID 映射
|
||||||
|
if resources_config:
|
||||||
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
|
rclient = n.create_client(SerialCommand, "/c2s_update_resource_tree")
|
||||||
rclient.wait_for_service()
|
rclient.wait_for_service()
|
||||||
|
|
||||||
# 序列化 ResourceTreeSet 为 JSON
|
|
||||||
if resources_config:
|
|
||||||
request = SerialCommand.Request()
|
request = SerialCommand.Request()
|
||||||
request.command = json.dumps(
|
request.command = json.dumps(
|
||||||
{
|
{
|
||||||
@@ -180,35 +166,61 @@ def slave(
|
|||||||
},
|
},
|
||||||
ensure_ascii=False,
|
ensure_ascii=False,
|
||||||
)
|
)
|
||||||
tree_response: SerialCommand_Response = rclient.call_async(request).result()
|
tree_response: SerialCommand_Response = rclient.call(request)
|
||||||
uuid_mapping = json.loads(tree_response.response)
|
uuid_mapping = json.loads(tree_response.response)
|
||||||
# 创建反向映射:new_uuid -> old_uuid
|
logger.info(f"Slave resource tree added. UUID mapping: {len(uuid_mapping)} nodes")
|
||||||
reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
|
|
||||||
for node in resources_config.root_nodes:
|
# 3.3 使用 UUID 映射更新 resources_config 的 UUID(参考 client.py 逻辑)
|
||||||
if node.res_content.type == "device":
|
old_uuids = {node.res_content.uuid: node for node in resources_config.all_nodes}
|
||||||
for sub_node in node.children:
|
for old_uuid, node in old_uuids.items():
|
||||||
# 只有二级子设备
|
if old_uuid in uuid_mapping:
|
||||||
if sub_node.res_content.type != "device":
|
new_uuid = uuid_mapping[old_uuid]
|
||||||
device_tracker = devices_instances[node.res_content.id].resource_tracker
|
node.res_content.uuid = new_uuid
|
||||||
# sub_node.res_content.uuid 已经是新UUID,需要用旧UUID去查找
|
# 更新所有子节点的 parent_uuid
|
||||||
old_uuid = reverse_uuid_mapping.get(sub_node.res_content.uuid)
|
for child in node.children:
|
||||||
if old_uuid:
|
child.res_content.parent_uuid = new_uuid
|
||||||
# 找到旧UUID,使用UUID查找
|
|
||||||
resource_instance = device_tracker.figure_resource({"uuid": old_uuid})
|
|
||||||
else:
|
else:
|
||||||
# 未找到旧UUID,使用name查找
|
logger.warning(f"资源UUID未更新: {old_uuid}")
|
||||||
resource_instance = device_tracker.figure_resource({"name": sub_node.res_content.name})
|
|
||||||
device_tracker.loop_update_uuid(resource_instance, uuid_mapping)
|
|
||||||
else:
|
|
||||||
logger.error("Slave模式不允许新增非设备节点下的物料")
|
|
||||||
continue
|
|
||||||
if tree_response:
|
|
||||||
logger.info(f"Slave resource tree added. Response: {tree_response.response}")
|
|
||||||
else:
|
|
||||||
logger.warning("Slave resource tree add response is None")
|
|
||||||
else:
|
else:
|
||||||
logger.info("No resources to add.")
|
logger.info("No resources to add.")
|
||||||
|
|
||||||
|
# 4. 初始化所有设备实例(此时 resources_config 的 UUID 已更新)
|
||||||
|
devices_instances = {}
|
||||||
|
for device_config in devices_config.root_nodes:
|
||||||
|
device_id = device_config.res_content.id
|
||||||
|
if device_config.res_content.type == "device":
|
||||||
|
d = initialize_device_from_dict(device_id, device_config.get_nested_dict())
|
||||||
|
if d is not None:
|
||||||
|
devices_instances[device_id] = d
|
||||||
|
logger.info(f"Device {device_id} initialized.")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Device {device_id} initialization failed.")
|
||||||
|
|
||||||
|
# 5. 如果启用可视化,创建可视化相关节点
|
||||||
|
if visual != "disable":
|
||||||
|
from unilabos.ros.nodes.presets.joint_republisher import JointRepublisher
|
||||||
|
|
||||||
|
# 将 ResourceTreeSet 转换为 list 用于 visual 组件
|
||||||
|
resources_list = (
|
||||||
|
[node.res_content.model_dump(by_alias=True) for node in resources_config.all_nodes]
|
||||||
|
if resources_config
|
||||||
|
else []
|
||||||
|
)
|
||||||
|
resource_mesh_manager = ResourceMeshManager(
|
||||||
|
resources_mesh_config,
|
||||||
|
resources_list,
|
||||||
|
resource_tracker=DeviceNodeResourceTracker(),
|
||||||
|
device_id="resource_mesh_manager",
|
||||||
|
)
|
||||||
|
joint_republisher = JointRepublisher("joint_republisher", DeviceNodeResourceTracker())
|
||||||
|
lh_joint_pub = LiquidHandlerJointPublisher(
|
||||||
|
resources_config=resources_list, resource_tracker=DeviceNodeResourceTracker()
|
||||||
|
)
|
||||||
|
executor.add_node(resource_mesh_manager)
|
||||||
|
executor.add_node(joint_republisher)
|
||||||
|
executor.add_node(lh_joint_pub)
|
||||||
|
|
||||||
|
# 7. 保持运行
|
||||||
while True:
|
while True:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
|||||||
@@ -878,11 +878,10 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
success = False
|
success = False
|
||||||
uuid_mapping = {}
|
uuid_mapping = {}
|
||||||
if len(self.bridges) > 0:
|
if len(self.bridges) > 0:
|
||||||
from unilabos.app.web.client import HTTPClient
|
from unilabos.app.web.client import HTTPClient, http_client
|
||||||
|
|
||||||
client: HTTPClient = self.bridges[-1]
|
|
||||||
resource_start_time = time.time()
|
resource_start_time = time.time()
|
||||||
uuid_mapping = client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
|
uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
|
||||||
success = True
|
success = True
|
||||||
resource_end_time = time.time()
|
resource_end_time = time.time()
|
||||||
self.lab_logger().info(
|
self.lab_logger().info(
|
||||||
@@ -990,9 +989,10 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
"""
|
"""
|
||||||
更新节点信息回调
|
更新节点信息回调
|
||||||
"""
|
"""
|
||||||
self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
|
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
|
||||||
try:
|
try:
|
||||||
from unilabos.app.communication import get_communication_client
|
from unilabos.app.communication import get_communication_client
|
||||||
|
from unilabos.app.web.client import HTTPClient, http_client
|
||||||
|
|
||||||
info = json.loads(request.command)
|
info = json.loads(request.command)
|
||||||
if "SYNC_SLAVE_NODE_INFO" in info:
|
if "SYNC_SLAVE_NODE_INFO" in info:
|
||||||
@@ -1001,10 +1001,10 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
edge_device_id = info["edge_device_id"]
|
edge_device_id = info["edge_device_id"]
|
||||||
self.device_machine_names[edge_device_id] = machine_name
|
self.device_machine_names[edge_device_id] = machine_name
|
||||||
else:
|
else:
|
||||||
comm_client = get_communication_client()
|
devices_config = info.pop("devices_config")
|
||||||
registry_config = info["registry_config"]
|
registry_config = info.pop("registry_config")
|
||||||
for device_config in registry_config:
|
if registry_config:
|
||||||
comm_client.publish_registry(device_config["id"], device_config)
|
http_client.resource_registry({"resources": registry_config})
|
||||||
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
|
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
|
||||||
response.response = "OK"
|
response.response = "OK"
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -1030,10 +1030,9 @@ class HostNode(BaseROS2DeviceNode):
|
|||||||
|
|
||||||
success = False
|
success = False
|
||||||
if len(self.bridges) > 0: # 边的提交待定
|
if len(self.bridges) > 0: # 边的提交待定
|
||||||
from unilabos.app.web.client import HTTPClient
|
from unilabos.app.web.client import HTTPClient, http_client
|
||||||
|
|
||||||
client: HTTPClient = self.bridges[-1]
|
r = http_client.resource_add(add_schema(resources))
|
||||||
r = client.resource_add(add_schema(resources))
|
|
||||||
success = bool(r)
|
success = bool(r)
|
||||||
|
|
||||||
response.success = success
|
response.success = success
|
||||||
|
|||||||
@@ -1007,7 +1007,7 @@ class DeviceNodeResourceTracker(object):
|
|||||||
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
|
current_uuid = self._get_resource_attr(res, "uuid", "unilabos_uuid")
|
||||||
if current_uuid and current_uuid in self.uuid_to_resources:
|
if current_uuid and current_uuid in self.uuid_to_resources:
|
||||||
self.uuid_to_resources.pop(current_uuid)
|
self.uuid_to_resources.pop(current_uuid)
|
||||||
logger.debug(f"移除资源UUID映射: {current_uuid} -> {res}")
|
logger.trace(f"移除资源UUID映射: {current_uuid} -> {res}")
|
||||||
return 1
|
return 1
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|||||||
@@ -191,7 +191,8 @@ def configure_logger(loglevel=None):
|
|||||||
|
|
||||||
# 添加处理器到根日志记录器
|
# 添加处理器到根日志记录器
|
||||||
root_logger.addHandler(console_handler)
|
root_logger.addHandler(console_handler)
|
||||||
|
logging.getLogger("asyncio").setLevel(logging.INFO)
|
||||||
|
logging.getLogger("urllib3").setLevel(logging.INFO)
|
||||||
|
|
||||||
# 配置日志系统
|
# 配置日志系统
|
||||||
configure_logger()
|
configure_logger()
|
||||||
|
|||||||
Reference in New Issue
Block a user