Merge remote-tracking branch 'origin/dev' into fork/q434343/device_visualization

This commit is contained in:
wznln
2025-05-01 16:33:51 +08:00
10 changed files with 206 additions and 88 deletions

View File

@@ -19,8 +19,8 @@
}, },
{ {
"id": "BottlesRack3", "id": "BottlesRack3",
"name": "Revvity上样盘3", "name": "上样盘3",
"parent": "Revvity", "parent": "HPLC",
"type": "plate", "type": "plate",
"class": null, "class": null,
"position": { "position": {

View File

@@ -13,7 +13,7 @@ ilabos_dir = os.path.dirname(os.path.dirname(current_dir))
if ilabos_dir not in sys.path: if ilabos_dir not in sys.path:
sys.path.append(ilabos_dir) sys.path.append(ilabos_dir)
from unilabos.config.config import load_config, BasicConfig from unilabos.config.config import load_config, BasicConfig, _update_config_from_env
from unilabos.utils.banner_print import print_status, print_unilab_banner from unilabos.utils.banner_print import print_status, print_unilab_banner
from unilabos.device_mesh.resource_visalization import ResourceVisualization from unilabos.device_mesh.resource_visalization import ResourceVisualization
@@ -86,8 +86,10 @@ def main():
args = parse_args() args = parse_args()
args_dict = vars(args) args_dict = vars(args)
# 加载配置文件 - 这里保持最先加载配置的逻辑 # 加载配置文件优先加载config然后从env读取
config_path = args_dict.get("config") config_path = args_dict.get("config")
if config_path is None:
config_path = os.environ.get("UNILABOS.BASICCONFIG.CONFIG_PATH", None)
if config_path: if config_path:
if not os.path.exists(config_path): if not os.path.exists(config_path):
print_status(f"配置文件 {config_path} 不存在", "error") print_status(f"配置文件 {config_path} 不存在", "error")
@@ -102,6 +104,9 @@ def main():
# 设置BasicConfig参数 # 设置BasicConfig参数
BasicConfig.is_host_mode = not args_dict.get("without_host", False) BasicConfig.is_host_mode = not args_dict.get("without_host", False)
BasicConfig.slave_no_host = args_dict.get("slave_no_host", False) BasicConfig.slave_no_host = args_dict.get("slave_no_host", False)
machine_name = os.popen("hostname").read().strip()
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
BasicConfig.machine_name = machine_name
from unilabos.resources.graphio import ( from unilabos.resources.graphio import (
read_node_link_json, read_node_link_json,

View File

@@ -168,11 +168,8 @@ class MQTTClient:
if self.mqtt_disable: if self.mqtt_disable:
return return
address = f"labs/{MQConfig.lab_id}/actions/" address = f"labs/{MQConfig.lab_id}/actions/"
action_type_name = action_info["title"] self.client.publish(address, json.dumps(action_info), qos=2)
action_info["title"] = action_id logger.debug(f"Action data published: address: {address}, {action_id}, {action_info}")
action_data = json.dumps({action_type_name: action_info}, ensure_ascii=False)
self.client.publish(address, action_data, qos=2)
logger.debug(f"Action data published: address: {address}, {action_data}")
mqtt_client = MQTTClient() mqtt_client = MQTTClient()

View File

@@ -333,7 +333,12 @@
<tr id="device-info-{{ loop.index }}" class="detail-row" style="display: none;"> <tr id="device-info-{{ loop.index }}" class="detail-row" style="display: none;">
<td colspan="5"> <td colspan="5">
<div class="content-full"> <div class="content-full">
<pre>{{ device.class|tojson(indent=4) }}</pre> {% if device.class %}
<pre>{{ device.class | tojson(indent=4) }}</pre>
{% else %}
<!-- 这里可以放占位内容,比如 -->
<pre>// No data</pre>
{% endif %}
{% if device.is_online %} {% if device.is_online %}
<div class="status-badge"><span class="online-status">在线</span></div> <div class="status-badge"><span class="online-status">在线</span></div>
@@ -366,7 +371,12 @@
<button class="copy-btn" onclick="copyToClipboard(this.previousElementSibling.textContent, event)">复制</button> <button class="copy-btn" onclick="copyToClipboard(this.previousElementSibling.textContent, event)">复制</button>
<button class="debug-btn" onclick="toggleDebugInfo(this, event)">调试</button> <button class="debug-btn" onclick="toggleDebugInfo(this, event)">调试</button>
<div class="debug-info" style="display:none;"> <div class="debug-info" style="display:none;">
<pre>{{ action_info|tojson(indent=2) }}</pre> {% if action_info %}
<pre>{{ action_info | tojson(indent=4) }}</pre>
{% else %}
<!-- 这里可以放占位内容,比如 -->
<pre>// No data</pre>
{% endif %}
</div> </div>
</div> </div>

View File

@@ -30,13 +30,13 @@ def get_host_node_info() -> Dict[str, Any]:
return host_info return host_info
host_info["available"] = True host_info["available"] = True
host_info["devices"] = { host_info["devices"] = {
device_id: { edge_device_id: {
"namespace": namespace, "namespace": namespace,
"is_online": f"{namespace}/{device_id}" in host_node._online_devices, "is_online": f"{namespace}/{edge_device_id}" in host_node._online_devices,
"key": f"{namespace}/{device_id}" if namespace.startswith("/") else f"/{namespace}/{device_id}", "key": f"{namespace}/{edge_device_id}" if namespace.startswith("/") else f"/{namespace}/{edge_device_id}",
"machine_name": host_node.device_machine_names.get(device_id, "未知"), "machine_name": host_node.device_machine_names.get(edge_device_id, "未知"),
} }
for device_id, namespace in host_node.devices_names.items() for edge_device_id, namespace in host_node.devices_names.items()
} }
# 获取已订阅的主题 # 获取已订阅的主题
host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics)) host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics))

View File

@@ -12,6 +12,7 @@ class BasicConfig:
config_path = "" config_path = ""
is_host_mode = True # 从registry.py移动过来 is_host_mode = True # 从registry.py移动过来
slave_no_host = False # 是否跳过rclient.wait_for_service() slave_no_host = False # 是否跳过rclient.wait_for_service()
machine_name = "undefined"
# MQTT配置 # MQTT配置
@@ -100,26 +101,79 @@ def _update_config_from_module(module):
logger.warning("Skipping key file loading, key_file is empty") logger.warning("Skipping key file loading, key_file is empty")
def _update_config_from_env():
prefix = "UNILABOS."
for env_key, env_value in os.environ.items():
if not env_key.startswith(prefix):
continue
try:
key_path = env_key[len(prefix):] # Remove UNILAB_ prefix
class_field = key_path.upper().split(".", 1)
if len(class_field) != 2:
logger.warning(f"[ENV] 环境变量格式不正确:{env_key}")
continue
class_key, field_key = class_field
# 遍历 globals 找匹配类(不区分大小写)
matched_cls = None
for name, obj in globals().items():
if name.upper() == class_key and isinstance(obj, type):
matched_cls = obj
break
if matched_cls is None:
logger.warning(f"[ENV] 未找到类:{class_key}")
continue
# 查找类属性(不区分大小写)
matched_field = None
for attr in dir(matched_cls):
if attr.upper() == field_key:
matched_field = attr
break
if matched_field is None:
logger.warning(f"[ENV] 类 {matched_cls.__name__} 中未找到字段:{field_key}")
continue
current_value = getattr(matched_cls, matched_field)
attr_type = type(current_value)
if attr_type == bool:
value = env_value.lower() in ("true", "1", "yes")
elif attr_type == int:
value = int(env_value)
elif attr_type == float:
value = float(env_value)
else:
value = env_value
setattr(matched_cls, matched_field, value)
logger.info(f"[ENV] 设置 {matched_cls.__name__}.{matched_field} = {value}")
except Exception as e:
logger.warning(f"[ENV] 解析环境变量 {env_key} 失败: {e}")
def load_config(config_path=None): def load_config(config_path=None):
# 如果提供了配置文件路径,从该文件导入配置 # 如果提供了配置文件路径,从该文件导入配置
if config_path: if config_path:
_update_config_from_env() # 允许config_path被env设定后读取
BasicConfig.config_path = os.path.abspath(os.path.dirname(config_path)) BasicConfig.config_path = os.path.abspath(os.path.dirname(config_path))
if not os.path.exists(config_path): if not os.path.exists(config_path):
logger.error(f"配置文件 {config_path} 不存在") logger.error(f"[ENV] 配置文件 {config_path} 不存在")
exit(1) exit(1)
try: try:
module_name = "lab_" + os.path.basename(config_path).replace(".py", "") module_name = "lab_" + os.path.basename(config_path).replace(".py", "")
spec = importlib.util.spec_from_file_location(module_name, config_path) spec = importlib.util.spec_from_file_location(module_name, config_path)
if spec is None: if spec is None:
logger.error(f"配置文件 {config_path} 错误") logger.error(f"[ENV] 配置文件 {config_path} 错误")
return return
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore spec.loader.exec_module(module) # type: ignore
_update_config_from_module(module) _update_config_from_module(module)
logger.info(f"配置文件 {config_path} 加载成功") logger.info(f"[ENV] 配置文件 {config_path} 加载成功")
except Exception as e: except Exception as e:
logger.error(f"加载配置文件 {config_path} 失败: {e}") logger.error(f"[ENV] 加载配置文件 {config_path} 失败")
traceback.print_exc() traceback.print_exc()
exit(1) exit(1)
else: else:

View File

@@ -1,7 +1,7 @@
import copy import copy
import json import json
import os import os
import traceback import threading
from typing import Optional, Dict, Any, List from typing import Optional, Dict, Any, List
import rclpy import rclpy
@@ -17,7 +17,6 @@ from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg, convert_to_ros_msg,
) )
from unilabos.ros.nodes.presets.host_node import HostNode from unilabos.ros.nodes.presets.host_node import HostNode
from unilabos.ros.x.rclpyx import run_event_loop_in_thread
from unilabos.utils import logger from unilabos.utils import logger
from unilabos.config.config import BasicConfig from unilabos.config.config import BasicConfig
from unilabos.utils.type_check import TypeEncoder from unilabos.utils.type_check import TypeEncoder
@@ -63,16 +62,11 @@ def main(
discovery_interval, discovery_interval,
) )
executor.add_node(host_node) thread = threading.Thread(target=executor.spin, daemon=True, name="host_executor_thread")
# run_event_loop_in_thread() thread.start()
try: while True:
executor.spin() input()
except Exception as e:
logger.error(traceback.format_exc())
print(f"Exception caught: {e}")
finally:
exit()
def slave( def slave(
@@ -97,18 +91,19 @@ def slave(
# else: # else:
# print(f"Warning: Device {device_id} could not be initialized or is not a valid Node") # print(f"Warning: Device {device_id} could not be initialized or is not a valid Node")
machine_name = os.popen("hostname").read().strip() n = Node(f"slaveMachine_{BasicConfig.machine_name}", parameter_overrides=[])
machine_name = "".join([c if c.isalnum() or c == "_" else "_" for c in machine_name])
n = Node(f"slaveMachine_{machine_name}", parameter_overrides=[])
executor.add_node(n) executor.add_node(n)
thread = threading.Thread(target=executor.spin, daemon=True, name="slave_executor_thread")
thread.start()
if not BasicConfig.slave_no_host: if not BasicConfig.slave_no_host:
sclient = n.create_client(SerialCommand, "/node_info_update") sclient = n.create_client(SerialCommand, "/node_info_update")
sclient.wait_for_service() sclient.wait_for_service()
request = SerialCommand.Request() request = SerialCommand.Request()
request.command = json.dumps({ request.command = json.dumps({
"machine_name": machine_name, "machine_name": BasicConfig.machine_name,
"type": "slave", "type": "slave",
"devices_config": devices_config_copy, "devices_config": devices_config_copy,
"registry_config": lab_registry.obtain_registry_device_info() "registry_config": lab_registry.obtain_registry_device_info()
@@ -124,16 +119,8 @@ def slave(
response = rclient.call_async(request).result() response = rclient.call_async(request).result()
logger.info(f"Slave resource added.") logger.info(f"Slave resource added.")
while True:
run_event_loop_in_thread() input()
try:
executor.spin()
except Exception as e:
print(f"Exception caught: {e}")
finally:
exit()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,3 +1,4 @@
import json
import threading import threading
import time import time
import traceback import traceback
@@ -13,15 +14,17 @@ from rclpy.action import ActionServer
from rclpy.action.server import ServerGoalHandle from rclpy.action.server import ServerGoalHandle
from rclpy.client import Client from rclpy.client import Client
from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.service import Service
from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type from unilabos.resources.graphio import convert_resources_to_type, convert_resources_from_type
from unilabos.ros.msgs.message_converter import ( from unilabos.ros.msgs.message_converter import (
convert_to_ros_msg, convert_to_ros_msg,
convert_from_ros_msg, convert_from_ros_msg,
convert_from_ros_msg_with_mapping, convert_from_ros_msg_with_mapping,
convert_to_ros_msg_with_mapping, convert_to_ros_msg_with_mapping, ros_action_to_json_schema,
) )
from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList # type: ignore from unilabos_msgs.srv import ResourceAdd, ResourceGet, ResourceDelete, ResourceUpdate, ResourceList, \
SerialCommand # type: ignore
from unilabos_msgs.msg import Resource # type: ignore from unilabos_msgs.msg import Resource # type: ignore
from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker from unilabos.ros.nodes.resource_tracker import DeviceNodeResourceTracker
@@ -29,7 +32,7 @@ from unilabos.ros.x.rclpyx import get_event_loop
from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator from unilabos.ros.utils.driver_creator import ProtocolNodeCreator, PyLabRobotCreator, DeviceClassCreator
from unilabos.utils.async_util import run_async_func from unilabos.utils.async_util import run_async_func
from unilabos.utils.log import info, debug, warning, error, critical, logger from unilabos.utils.log import info, debug, warning, error, critical, logger
from unilabos.utils.type_check import get_type_class from unilabos.utils.type_check import get_type_class, TypeEncoder
T = TypeVar("T") T = TypeVar("T")
@@ -44,19 +47,17 @@ class ROSLoggerAdapter:
@property @property
def identifier(self): def identifier(self):
return f"{self.namespace}/{self.node_name}" return f"{self.namespace}"
def __init__(self, ros_logger, node_name, namespace): def __init__(self, ros_logger, namespace):
""" """
初始化日志适配器 初始化日志适配器
Args: Args:
ros_logger: ROS2日志记录器 ros_logger: ROS2日志记录器
node_name: 节点名称
namespace: 命名空间 namespace: 命名空间
""" """
self.ros_logger = ros_logger self.ros_logger = ros_logger
self.node_name = node_name
self.namespace = namespace self.namespace = namespace
self.level_2_logger_func = { self.level_2_logger_func = {
"info": info, "info": info,
@@ -258,9 +259,9 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.lab_logger().critical("资源跟踪器未初始化,请检查") self.lab_logger().critical("资源跟踪器未初始化,请检查")
# 创建自定义日志记录器 # 创建自定义日志记录器
self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.node_name, self.namespace) self._lab_logger = ROSLoggerAdapter(self.get_logger(), self.namespace)
self._action_servers = {} self._action_servers: Dict[str, ActionServer] = {}
self._property_publishers = {} self._property_publishers = {}
self._status_types = status_types self._status_types = status_types
self._action_value_mappings = action_value_mappings self._action_value_mappings = action_value_mappings
@@ -284,7 +285,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
self.create_ros_action_server(action_name, action_value_mapping) self.create_ros_action_server(action_name, action_value_mapping)
# 创建线程池执行器 # 创建线程池执行器
self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1)) self._executor = ThreadPoolExecutor(max_workers=max(len(action_value_mappings), 1), thread_name_prefix=f"ROSDevice{self.device_id}")
# 创建资源管理客户端 # 创建资源管理客户端
self._resource_clients: Dict[str, Client] = { self._resource_clients: Dict[str, Client] = {
@@ -295,6 +296,18 @@ class BaseROS2DeviceNode(Node, Generic[T]):
"resource_list": self.create_client(ResourceList, "/resources/list"), "resource_list": self.create_client(ResourceList, "/resources/list"),
} }
def query_host_name_cb(req, res):
self.register_device()
self.lab_logger().info("Host要求重新注册当前节点")
res.response = ""
return res
self._service_server: Dict[str, Service] = {
"query_host_name": self.create_service(
SerialCommand, f"/srv{self.namespace}/query_host_name", query_host_name_cb, callback_group=self.callback_group
),
}
# 向全局在线设备注册表添加设备信息 # 向全局在线设备注册表添加设备信息
self.register_device() self.register_device()
rclpy.get_global_executor().add_node(self) rclpy.get_global_executor().add_node(self)
@@ -318,6 +331,31 @@ class BaseROS2DeviceNode(Node, Generic[T]):
) )
# 加入全局注册表 # 加入全局注册表
registered_devices[self.device_id] = device_info registered_devices[self.device_id] = device_info
from unilabos.config.config import BasicConfig
if not BasicConfig.is_host_mode:
sclient = self.create_client(SerialCommand, "/node_info_update")
# 启动线程执行发送任务
threading.Thread(
target=self.send_slave_node_info,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_send_slave_node_info"
).start()
def send_slave_node_info(self, sclient):
sclient.wait_for_service()
request = SerialCommand.Request()
from unilabos.config.config import BasicConfig
request.command = json.dumps({
"SYNC_SLAVE_NODE_INFO": {
"machine_name": BasicConfig.machine_name,
"type": "slave",
"edge_device_id": self.device_id
}}, ensure_ascii=False, cls=TypeEncoder)
# 发送异步请求并等待结果
future = sclient.call_async(request)
response = future.result()
def lab_logger(self): def lab_logger(self):
""" """

View File

@@ -27,6 +27,7 @@ from unilabos.ros.msgs.message_converter import (
) )
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.utils.type_check import TypeEncoder
class HostNode(BaseROS2DeviceNode): class HostNode(BaseROS2DeviceNode):
@@ -98,7 +99,7 @@ class HostNode(BaseROS2DeviceNode):
# 创建设备、动作客户端和目标存储 # 创建设备、动作客户端和目标存储
self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射 self.devices_names: Dict[str, str] = {} # 存储设备名称和命名空间的映射
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例 self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
self.device_machine_names: Dict[str, str] = {} # 存储设备ID到机器名称的映射 self.device_machine_names: Dict[str, str] = {device_id: "本地", } # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例 self._action_clients: Dict[str, ActionClient] = {} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = ( self._action_value_mappings: Dict[str, Dict] = (
{} {}
@@ -160,6 +161,13 @@ class HostNode(BaseROS2DeviceNode):
self.lab_logger().info("[Host Node] Host node initialized.") self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set() HostNode._ready_event.set()
def _send_re_register(self, sclient):
sclient.wait_for_service()
request = SerialCommand.Request()
request.command = ""
future = sclient.call_async(request)
response = future.result()
def _discover_devices(self) -> None: def _discover_devices(self) -> None:
""" """
发现网络中的设备 发现网络中的设备
@@ -176,23 +184,37 @@ class HostNode(BaseROS2DeviceNode):
current_devices = set() current_devices = set()
for device_id, namespace in nodes_and_names: for device_id, namespace in nodes_and_names:
if not namespace.startswith("/devices"): if not namespace.startswith("/devices/"):
continue continue
edge_device_id = namespace[9:]
# 将设备添加到当前设备集合 # 将设备添加到当前设备集合
device_key = f"{namespace}/{device_id}" device_key = f"{namespace}/{edge_device_id}" # namespace已经包含device_id了这里复写一遍
current_devices.add(device_key) current_devices.add(device_key)
# 如果是新设备记录并创建ActionClient # 如果是新设备记录并创建ActionClient
if device_id not in self.devices_names: if edge_device_id not in self.devices_names:
self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}") self.lab_logger().info(f"[Host Node] Discovered new device: {device_key}")
self.devices_names[device_id] = namespace self.devices_names[edge_device_id] = namespace
self._create_action_clients_for_device(device_id, namespace) self._create_action_clients_for_device(device_id, namespace)
self._online_devices.add(device_key) self._online_devices.add(device_key)
sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name")
threading.Thread(
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
).start()
elif device_key not in self._online_devices: elif device_key not in self._online_devices:
# 设备重新上线 # 设备重新上线
self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}") self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}")
self._online_devices.add(device_key) self._online_devices.add(device_key)
sclient = self.create_client(SerialCommand, f"/srv{namespace}/query_host_name")
threading.Thread(
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_query_host_name_{namespace}"
).start()
# 检测离线设备 # 检测离线设备
offline_devices = self._online_devices - current_devices offline_devices = self._online_devices - current_devices
@@ -234,17 +256,22 @@ class HostNode(BaseROS2DeviceNode):
self._action_clients[action_id] = ActionClient( self._action_clients[action_id] = ActionClient(
self, action_type, action_id, callback_group=self.callback_group self, action_type, action_id, callback_group=self.callback_group
) )
self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") self.lab_logger().debug(f"[Host Node] Created ActionClient (Discovery): {action_id}")
action_name = action_id[len(namespace) + 1:]
edge_device_id = namespace[9:]
from unilabos.app.mq import mqtt_client from unilabos.app.mq import mqtt_client
info_with_schema = ros_action_to_json_schema(action_type) info_with_schema = ros_action_to_json_schema(action_type)
mqtt_client.publish_actions(action_id, info_with_schema) mqtt_client.publish_actions(action_name, {
"device_id": edge_device_id,
"action_name": action_name,
"schema": info_with_schema,
})
except Exception as e: except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}") self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None: def initialize_device(self, device_id: str, device_config: Dict[str, Any]) -> None:
""" """
根据配置初始化设备 根据配置初始化设备
此函数根据提供的设备配置动态导入适当的设备类并创建其实例。 此函数根据提供的设备配置动态导入适当的设备类并创建其实例。
同时为设备的动作值映射设置动作客户端。 同时为设备的动作值映射设置动作客户端。
@@ -260,7 +287,7 @@ class HostNode(BaseROS2DeviceNode):
if d is None: if d is None:
return return
# noinspection PyProtectedMember # noinspection PyProtectedMember
self.devices_names[device_id] = d._ros_node.namespace self.devices_names[device_id] = d._ros_node.namespace # 这里不涉及二级device_id
self.device_machine_names[device_id] = "本地" self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d self.devices_instances[device_id] = d
# noinspection PyProtectedMember # noinspection PyProtectedMember
@@ -269,14 +296,17 @@ class HostNode(BaseROS2DeviceNode):
if action_id not in self._action_clients: if action_id not in self._action_clients:
action_type = action_value_mapping["type"] action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id) self._action_clients[action_id] = ActionClient(self, action_type, action_id)
self.lab_logger().debug(f"[Host Node] Created ActionClient: {action_id}") self.lab_logger().debug(f"[Host Node] Created ActionClient (Local): {action_id}") # 子设备再创建用的是Discover发现的
from unilabos.app.mq import mqtt_client from unilabos.app.mq import mqtt_client
info_with_schema = ros_action_to_json_schema(action_type) info_with_schema = ros_action_to_json_schema(action_type)
mqtt_client.publish_actions(action_id, info_with_schema) mqtt_client.publish_actions(action_name, {
"device_id": device_id,
"action_name": action_name,
"schema": info_with_schema,
})
else: else:
self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.") self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.")
device_key = f"{self.devices_names[device_id]}/{device_id}" device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id
# 添加到在线设备列表 # 添加到在线设备列表
self._online_devices.add(device_key) self._online_devices.add(device_key)
@@ -298,8 +328,8 @@ class HostNode(BaseROS2DeviceNode):
# 解析设备名和属性名 # 解析设备名和属性名
parts = topic.split("/") parts = topic.split("/")
if len(parts) >= 4: if len(parts) >= 4: # 可能有ProtocolNode创建更长的设备
device_id = parts[-2] device_id = "/".join(parts[2:-1])
property_name = parts[-1] property_name = parts[-1]
# 初始化设备状态字典 # 初始化设备状态字典
@@ -526,21 +556,19 @@ class HostNode(BaseROS2DeviceNode):
from unilabos.app.mq import mqtt_client from unilabos.app.mq import mqtt_client
info = json.loads(request.command) info = json.loads(request.command)
machine_name = info["machine_name"] if "SYNC_SLAVE_NODE_INFO" in info:
devices_config = info["devices_config"] info = info["SYNC_SLAVE_NODE_INFO"]
registry_config = info["registry_config"] machine_name = info["machine_name"]
edge_device_id = info["edge_device_id"]
# 更新设备机器名称映射 self.device_machine_names[edge_device_id] = machine_name
for device_id in devices_config.keys(): else:
self.device_machine_names[device_id] = machine_name registry_config = info["registry_config"]
self.lab_logger().debug(f"[Host Node] Updated machine name for device {device_id}: {machine_name}") for device_config in registry_config:
mqtt_client.publish_registry(device_config["id"], device_config)
for device_config in registry_config: self.lab_logger().debug(f"[Host Node] Node info update: {info}")
mqtt_client.publish_registry(device_config["id"], device_config)
self.lab_logger().info(f"[Host Node] Node info update: {info}")
response.response = "OK" response.response = "OK"
except Exception as e: except Exception as e:
self.lab_logger().error(f"[Host Node] Error updating node info: {str(e)}") self.lab_logger().error(f"[Host Node] Error updating node info: {e.args}")
response.response = "ERROR" response.response = "ERROR"
return response return response

View File

@@ -224,8 +224,7 @@ class PyLabRobotCreator(DeviceClassCreator[T]):
if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction(getattr(self.device_instance, "setup")): if hasattr(self.device_instance, "setup") and asyncio.iscoroutinefunction(getattr(self.device_instance, "setup")):
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
ROS2DeviceNode.run_async_func(getattr(self.device_instance, "setup")).add_done_callback(lambda x: logger.debug(f"PyLabRobot设备实例 {self.device_instance} 设置完成")) ROS2DeviceNode.run_async_func(getattr(self.device_instance, "setup")).add_done_callback(lambda x: logger.debug(f"PyLabRobot设备实例 {self.device_instance} 设置完成"))
# 2486229810384
#2486232539792
class ProtocolNodeCreator(DeviceClassCreator[T]): class ProtocolNodeCreator(DeviceClassCreator[T]):
""" """