mirror of
https://github.com/dptech-corp/Uni-Lab-OS.git
synced 2025-12-17 21:11:12 +00:00
* 修复嵌套节点,mq发送任务id出错的问题 修正一处注册表命名错误 * 修复本地看板Action显示不全 修复本地子设备没有机器名称的bug * 补全vacuum_pump.mock注册信息
68 lines
2.5 KiB
Python
68 lines
2.5 KiB
Python
"""
|
||
主机节点工具模块
|
||
|
||
提供与主机节点相关的工具函数
|
||
"""
|
||
|
||
import time
|
||
from typing import Dict, Any
|
||
|
||
from unilabos.config.config import BasicConfig
|
||
from unilabos.ros.nodes.presets.host_node import HostNode
|
||
from unilabos.app.web.utils.action_utils import get_action_info
|
||
|
||
|
||
def get_host_node_info() -> Dict[str, Any]:
|
||
"""
|
||
获取主机节点信息
|
||
|
||
尝试获取HostNode实例并提取其设备、主题和动作客户端信息
|
||
|
||
Returns:
|
||
Dict: 包含主机节点信息的字典
|
||
"""
|
||
host_info = {"available": False, "devices": {}, "subscribed_topics": [], "action_clients": {}}
|
||
if not BasicConfig.is_host_mode:
|
||
return host_info
|
||
# 尝试获取HostNode实例,设置超时为0秒
|
||
host_node = HostNode.get_instance(0)
|
||
if not host_node:
|
||
return host_info
|
||
host_info["available"] = True
|
||
host_info["devices"] = {
|
||
edge_device_id: {
|
||
"namespace": namespace,
|
||
"is_online": f"{namespace}/{edge_device_id}" in host_node._online_devices,
|
||
"key": f"{namespace}/{edge_device_id}" if namespace.startswith("/") else f"/{namespace}/{edge_device_id}",
|
||
"machine_name": host_node.device_machine_names.get(edge_device_id, "未知"),
|
||
}
|
||
for edge_device_id, namespace in host_node.devices_names.items()
|
||
}
|
||
# 获取已订阅的主题
|
||
host_info["subscribed_topics"] = sorted(list(host_node._subscribed_topics))
|
||
# 获取动作客户端信息
|
||
for action_id, client in host_node._action_clients.items():
|
||
host_info["action_clients"][action_id] = get_action_info(client, full_name=action_id)
|
||
|
||
# 获取设备状态
|
||
host_info["device_status"] = host_node.device_status
|
||
|
||
# 添加设备状态更新时间戳
|
||
current_time = time.time()
|
||
host_info["device_status_timestamps"] = {}
|
||
for device_id, properties in host_node.device_status_timestamps.items():
|
||
host_info["device_status_timestamps"][device_id] = {}
|
||
for prop_name, timestamp in properties.items():
|
||
if timestamp > 0: # 只处理有效的时间戳
|
||
host_info["device_status_timestamps"][device_id][prop_name] = {
|
||
"timestamp": timestamp,
|
||
"elapsed": round(current_time - timestamp, 2), # 计算经过的时间(秒)
|
||
}
|
||
else:
|
||
host_info["device_status_timestamps"][device_id][prop_name] = {
|
||
"timestamp": 0,
|
||
"elapsed": -1, # 表示未曾更新过
|
||
}
|
||
|
||
return host_info
|