支持local_config启动 添加注册表description字段 (#13)

Closes #11

* Update README and MQTTClient for installation instructions and code improvements

* feat: 支持local_config启动
add: 增加对crt path的说明,为传入config.py的相对路径
move: web component

* add: registry description

---------

Co-authored-by: Harvey Que <Q-Query@outlook.com>
This commit is contained in:
Xuwznln
2025-04-20 18:24:45 +08:00
committed by GitHub
parent 22a02bdb06
commit 35ada068cc
39 changed files with 114 additions and 34 deletions

184
unilabos/app/web/pages.py Normal file
View File

@@ -0,0 +1,184 @@
"""
Web页面模块
提供系统Web界面的页面定义
"""
import json
import os
import sys
from pathlib import Path
from typing import Dict
from fastapi import APIRouter, HTTPException
from fastapi.responses import HTMLResponse, JSONResponse
from jinja2 import Environment, FileSystemLoader
from unilabos.config.config import BasicConfig
from unilabos.registry.registry import lab_registry
from unilabos.app.mq import mqtt_client
from unilabos.ros.msgs.message_converter import msg_converter_manager
from unilabos.utils.log import error
from unilabos.utils.type_check import TypeEncoder
from unilabos.app.web.utils.device_utils import get_registry_info
from unilabos.app.web.utils.host_utils import get_host_node_info
from unilabos.app.web.utils.ros_utils import get_ros_node_info, update_ros_node_info
# 设置Jinja2模板环境
template_dir = Path(__file__).parent / "templates"
env = Environment(loader=FileSystemLoader(template_dir))
def setup_web_pages(router: APIRouter) -> None:
"""
设置Web页面路由
Args:
router: FastAPI路由器实例
"""
# 在web服务启动时尝试初始化ROS节点信息
update_ros_node_info()
@router.get("/", response_class=HTMLResponse, summary="Home Page")
async def home_page() -> str:
"""
首页显示所有可用的API路由
Returns:
HTMLResponse: 渲染后的HTML页面
"""
try:
# 收集所有路由
routes = []
for route in router.routes:
if hasattr(route, "methods") and hasattr(route, "path"):
for method in list(getattr(route, "methods", [])):
path = getattr(route, "path", "")
# 只显示GET方法的路由作为链接
if method == "GET":
name = getattr(route, "name", "") or path
summary = getattr(route, "summary", "") or name
routes.append({"method": method, "path": path, "name": name, "summary": summary})
# 使用模板渲染页面
template = env.get_template("home.html")
html = template.render(routes=routes)
return html
except Exception as e:
error(f"生成主页时出错: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error generating home page: {str(e)}")
@router.get("/status", response_class=HTMLResponse, summary="System Status")
async def status_page() -> str:
"""
状态页面,显示系统状态信息
Returns:
HTMLResponse: 渲染后的HTML页面
"""
try:
# 准备设备数据
devices = []
resources = []
modules = {"names": [], "classes": [], "displayed_count": 0, "total_count": 0}
# 获取在线设备信息
ros_node_info = get_ros_node_info()
# 获取主机节点信息
host_node_info = get_host_node_info()
# 获取Registry路径信息
registry_info = get_registry_info()
# 获取已加载的设备
if lab_registry:
# 设备类型
for device_id, device_info in lab_registry.device_type_registry.items():
msg = {
"id": device_id,
"name": device_info.get("name", "未命名"),
"file_path": device_info.get("file_path", ""),
"class_json": json.dumps(
device_info.get("class", {}), indent=4, ensure_ascii=False, cls=TypeEncoder
),
}
mqtt_client.publish_registry(device_id, device_info)
devices.append(msg)
# 资源类型
for resource_id, resource_info in lab_registry.resource_type_registry.items():
resources.append(
{
"id": resource_id,
"name": resource_info.get("name", "未命名"),
"file_path": resource_info.get("file_path", ""),
}
)
# 获取导入的模块
if msg_converter_manager:
modules["names"] = msg_converter_manager.list_modules()
all_classes = [i for i in msg_converter_manager.list_classes() if "." in i]
modules["total_count"] = len(all_classes)
modules["classes"] = all_classes
# 使用模板渲染页面
template = env.get_template("status.html")
html = template.render(
devices=devices,
resources=resources,
modules=modules,
is_host_mode=BasicConfig.is_host_mode,
registry_info=registry_info,
ros_node_info=ros_node_info,
host_node_info=host_node_info,
)
return html
except Exception as e:
error(f"生成状态页面时出错: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error generating status page: {str(e)}")
@router.get("/open-folder", response_class=JSONResponse, summary="Open Local Folder")
async def open_folder(path: str = "") -> Dict[str, str]:
"""
打开本地文件夹
Args:
path: 要打开的文件夹路径
Returns:
JSONResponse: 操作结果
Raises:
HTTPException: 如果路径为空或不存在
"""
if not path:
return {"status": "error", "message": "Path is empty"}
try:
# 规范化路径
norm_path = os.path.normpath(path)
# 如果是文件路径,获取其目录
if os.path.isfile(norm_path):
norm_path = os.path.dirname(norm_path)
# 检查路径是否存在
if not os.path.exists(norm_path):
return {"status": "error", "message": f"Path does not exist: {norm_path}"}
# Windows
if os.name == "nt":
os.startfile(norm_path)
# macOS
elif sys.platform == "darwin":
os.system(f'open "{norm_path}"')
# Linux
else:
os.system(f'xdg-open "{norm_path}"')
return {"status": "success", "message": f"Opened folder: {norm_path}"}
except Exception as e:
error(f"打开文件夹时出错: {str(e)}")
return {"status": "error", "message": f"Failed to open folder: {str(e)}"}