From ffc583e9d5bec9eae29027d992a7c00f36d0ee45 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Wed, 26 Nov 2025 19:03:31 +0800 Subject: [PATCH] Add backend api and update doc --- docs/developer_guide/http_api.md | 334 ++++++++++++++ docs/index.md | 14 + unilabos/app/model.py | 36 +- unilabos/app/web/api.py | 89 +++- unilabos/app/web/controler.py | 45 -- unilabos/app/web/controller.py | 587 ++++++++++++++++++++++++ unilabos/ros/nodes/presets/host_node.py | 10 + 7 files changed, 1049 insertions(+), 66 deletions(-) create mode 100644 docs/developer_guide/http_api.md delete mode 100644 unilabos/app/web/controler.py create mode 100644 unilabos/app/web/controller.py diff --git a/docs/developer_guide/http_api.md b/docs/developer_guide/http_api.md new file mode 100644 index 00000000..a1f548df --- /dev/null +++ b/docs/developer_guide/http_api.md @@ -0,0 +1,334 @@ +# HTTP API 指南 + +本文档介绍如何通过 HTTP API 与 Uni-Lab-OS 进行交互,包括查询设备、提交任务和获取结果。 + +## 概述 + +Uni-Lab-OS 提供 RESTful HTTP API,允许外部系统通过标准 HTTP 请求控制实验室设备。API 基于 FastAPI 构建,默认运行在 `http://localhost:8002`。 + +### 基础信息 + +- **Base URL**: `http://localhost:8002/api/v1` +- **Content-Type**: `application/json` +- **响应格式**: JSON + +### 通用响应结构 + +```json +{ + "code": 0, + "data": { ... }, + "message": "success" +} +``` + +| 字段 | 类型 | 说明 | +| --------- | ------ | ------------------ | +| `code` | int | 状态码,0 表示成功 | +| `data` | object | 响应数据 | +| `message` | string | 响应消息 | + +## 快速开始 + +以下是一个完整的工作流示例:查询设备 → 获取动作 → 提交任务 → 获取结果。 + +### 步骤 1: 获取在线设备 + +```bash +curl -X GET "http://localhost:8002/api/v1/online-devices" +``` + +**响应示例**: + +```json +{ + "code": 0, + "data": { + "online_devices": { + "host_node": { + "device_key": "/host_node", + "namespace": "", + "machine_name": "本地", + "uuid": "xxx-xxx-xxx", + "node_name": "host_node" + } + }, + "total_count": 1, + "timestamp": 1732612345.123 + }, + "message": "success" +} +``` + +### 步骤 2: 获取设备可用动作 + +```bash +curl -X GET "http://localhost:8002/api/v1/devices/host_node/actions" +``` + +**响应示例**: + +```json +{ + "code": 0, + "data": { + "device_id": "host_node", + "actions": { + "test_latency": { + "type_name": "unilabos_msgs.action._empty_in.EmptyIn", + "type_name_convert": "unilabos_msgs/action/_empty_in/EmptyIn", + "action_path": "/devices/host_node/test_latency", + "goal_info": "{}", + "is_busy": false, + "current_job_id": null + }, + "create_resource": { + "type_name": "unilabos_msgs.action._resource_create_from_outer_easy.ResourceCreateFromOuterEasy", + "action_path": "/devices/host_node/create_resource", + "goal_info": "{res_id: '', device_id: '', class_name: '', ...}", + "is_busy": false, + "current_job_id": null + } + }, + "action_count": 5 + }, + "message": "success" +} +``` + +**动作状态字段说明**: + +| 字段 | 说明 | +| ---------------- | ----------------------------- | +| `type_name` | 动作类型的完整名称 | +| `action_path` | ROS2 动作路径 | +| `goal_info` | 动作参数模板 | +| `is_busy` | 动作是否正在执行 | +| `current_job_id` | 当前执行的任务 ID(如果繁忙) | + +### 步骤 3: 提交任务 + +```bash +curl -X POST "http://localhost:8002/api/v1/job/add" \ + -H "Content-Type: application/json" \ + -d '{"device_id":"host_node","action":"test_latency","action_args":{}}' +``` + +**请求体**: + +```json +{ + "device_id": "host_node", + "action": "test_latency", + "action_args": {} +} +``` + +**请求参数说明**: + +| 字段 | 类型 | 必填 | 说明 | +| ------------- | ------ | ---- | ---------------------------------- | +| `device_id` | string | ✓ | 目标设备 ID | +| `action` | string | ✓ | 动作名称 | +| `action_args` | object | ✓ | 动作参数(根据动作类型不同而变化) | + +**响应示例**: + +```json +{ + "code": 0, + "data": { + "jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd", + "status": 1, + "result": {} + }, + "message": "success" +} +``` + +**任务状态码**: + +| 状态码 | 含义 | 说明 | +| ------ | --------- | ------------------------------ | +| 0 | UNKNOWN | 未知状态 | +| 1 | ACCEPTED | 任务已接受,等待执行 | +| 2 | EXECUTING | 任务执行中 | +| 3 | CANCELING | 任务取消中 | +| 4 | SUCCEEDED | 任务成功完成 | +| 5 | CANCELED | 任务已取消 | +| 6 | ABORTED | 任务中止(设备繁忙或执行失败) | + +### 步骤 4: 查询任务状态和结果 + +```bash +curl -X GET "http://localhost:8002/api/v1/job/b6acb586-733a-42ab-9f73-55c9a52aa8bd/status" +``` + +**响应示例(执行中)**: + +```json +{ + "code": 0, + "data": { + "jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd", + "status": 2, + "result": {} + }, + "message": "success" +} +``` + +**响应示例(执行完成)**: + +```json +{ + "code": 0, + "data": { + "jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd", + "status": 4, + "result": { + "error": "", + "suc": true, + "return_value": { + "avg_rtt_ms": 103.99, + "avg_time_diff_ms": 7181.55, + "max_time_error_ms": 7210.57, + "task_delay_ms": -1, + "raw_delay_ms": 33.19, + "test_count": 5, + "status": "success" + } + } + }, + "message": "success" +} +``` + +> **注意**: 任务结果在首次查询后会被自动删除,请确保保存返回的结果数据。 + +## API 端点列表 + +### 设备相关 + +| 端点 | 方法 | 说明 | +| ---------------------------------------------------------- | ---- | ---------------------- | +| `/api/v1/online-devices` | GET | 获取在线设备列表 | +| `/api/v1/devices` | GET | 获取设备配置 | +| `/api/v1/devices/{device_id}/actions` | GET | 获取指定设备的可用动作 | +| `/api/v1/devices/{device_id}/actions/{action_name}/schema` | GET | 获取动作参数 Schema | +| `/api/v1/actions` | GET | 获取所有设备的可用动作 | + +### 任务相关 + +| 端点 | 方法 | 说明 | +| ----------------------------- | ---- | ------------------ | +| `/api/v1/job/add` | POST | 提交新任务 | +| `/api/v1/job/{job_id}/status` | GET | 查询任务状态和结果 | + +### 资源相关 + +| 端点 | 方法 | 说明 | +| ------------------- | ---- | ------------ | +| `/api/v1/resources` | GET | 获取资源列表 | + +## 常见动作示例 + +### test_latency - 延迟测试 + +测试系统延迟,无需参数。 + +```bash +curl -X POST "http://localhost:8002/api/v1/job/add" \ + -H "Content-Type: application/json" \ + -d '{"device_id":"host_node","action":"test_latency","action_args":{}}' +``` + +### create_resource - 创建资源 + +在设备上创建新资源。 + +```bash +curl -X POST "http://localhost:8002/api/v1/job/add" \ + -H "Content-Type: application/json" \ + -d '{ + "device_id": "host_node", + "action": "create_resource", + "action_args": { + "res_id": "my_plate", + "device_id": "host_node", + "class_name": "Plate", + "parent": "deck", + "bind_locations": {"x": 0, "y": 0, "z": 0} + } +}' +``` + +## 错误处理 + +### 设备繁忙 + +当设备正在执行其他任务时,提交新任务会返回 `status: 6`(ABORTED): + +```json +{ + "code": 0, + "data": { + "jobId": "xxx", + "status": 6, + "result": {} + }, + "message": "success" +} +``` + +此时应等待当前任务完成后重试,或使用 `/devices/{device_id}/actions` 检查动作的 `is_busy` 状态。 + +### 参数错误 + +```json +{ + "code": 2002, + "data": { ... }, + "message": "device_id is required" +} +``` + +## 轮询策略 + +推荐的任务状态轮询策略: + +```python +import requests +import time + +def wait_for_job(job_id, timeout=60, interval=0.5): + """等待任务完成并返回结果""" + start_time = time.time() + + while time.time() - start_time < timeout: + response = requests.get(f"http://localhost:8002/api/v1/job/{job_id}/status") + data = response.json()["data"] + + status = data["status"] + if status in (4, 5, 6): # SUCCEEDED, CANCELED, ABORTED + return data + + time.sleep(interval) + + raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds") + +# 使用示例 +response = requests.post( + "http://localhost:8002/api/v1/job/add", + json={"device_id": "host_node", "action": "test_latency", "action_args": {}} +) +job_id = response.json()["data"]["jobId"] +result = wait_for_job(job_id) +print(result) +``` + +## 相关文档 + +- [设备注册指南](add_device.md) +- [动作定义指南](add_action.md) +- [网络架构概述](networking_overview.md) diff --git a/docs/index.md b/docs/index.md index e795dbcf..6326bb8c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,3 +7,17 @@ Uni-Lab-OS 是一个开源的实验室自动化操作系统,提供统一的设 intro.md ``` + +## 开发者指南 + +```{toctree} +:maxdepth: 2 + +developer_guide/http_api.md +developer_guide/networking_overview.md +developer_guide/add_device.md +developer_guide/add_action.md +developer_guide/add_registry.md +developer_guide/add_yaml.md +developer_guide/action_includes.md +``` diff --git a/unilabos/app/model.py b/unilabos/app/model.py index a7c199c9..6f40e731 100644 --- a/unilabos/app/model.py +++ b/unilabos/app/model.py @@ -51,21 +51,25 @@ class Resp(BaseModel): class JobAddReq(BaseModel): device_id: str = Field(examples=["Gripper"], description="device id") action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="") - action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="") - action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="") - task_id: str = Field(examples=["task_id"], description="task uuid") - job_id: str = Field(examples=["job_id"], description="goal uuid") - node_id: str = Field(examples=["node_id"], description="node uuid") - server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info") + action_type: str = Field( + examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default="" + ) + action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict) + task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="") + job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="") + node_id: str = Field(examples=["node_id"], description="node uuid", default="") + server_info: dict = Field( + examples=[{"send_timestamp": 1717000000.0}], + description="server info (auto-generated if empty)", + default_factory=dict, + ) - data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default={}) + data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default_factory=dict) class JobStepFinishReq(BaseModel): token: str = Field(examples=["030944"], description="token") - request_time: str = Field( - examples=["2024-12-12 12:12:12.xxx"], description="requestTime" - ) + request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime") data: dict = Field( examples=[ { @@ -83,9 +87,7 @@ class JobStepFinishReq(BaseModel): class JobPreintakeFinishReq(BaseModel): token: str = Field(examples=["030944"], description="token") - request_time: str = Field( - examples=["2024-12-12 12:12:12.xxx"], description="requestTime" - ) + request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime") data: dict = Field( examples=[ { @@ -102,9 +104,7 @@ class JobPreintakeFinishReq(BaseModel): class JobFinishReq(BaseModel): token: str = Field(examples=["030944"], description="token") - request_time: str = Field( - examples=["2024-12-12 12:12:12.xxx"], description="requestTime" - ) + request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime") data: dict = Field( examples=[ { @@ -133,6 +133,10 @@ class JobData(BaseModel): default=0, description="0:UNKNOWN, 1:ACCEPTED, 2:EXECUTING, 3:CANCELING, 4:SUCCEEDED, 5:CANCELED, 6:ABORTED", ) + result: dict = Field( + default_factory=dict, + description="Job result data (available when status is SUCCEEDED/CANCELED/ABORTED)", + ) class JobStatusResp(Resp): diff --git a/unilabos/app/web/api.py b/unilabos/app/web/api.py index 868de44a..0f6077c8 100644 --- a/unilabos/app/web/api.py +++ b/unilabos/app/web/api.py @@ -9,13 +9,22 @@ import asyncio import yaml -from unilabos.app.web.controler import devices, job_add, job_info +from unilabos.app.web.controller import ( + devices, + job_add, + job_info, + get_online_devices, + get_device_actions, + get_action_schema, + get_all_available_actions, +) from unilabos.app.model import ( Resp, RespCode, JobStatusResp, JobAddResp, JobAddReq, + JobData, ) from unilabos.app.web.utils.host_utils import get_host_node_info from unilabos.registry.registry import lab_registry @@ -1234,6 +1243,65 @@ def get_devices(): return Resp(data=dict(data)) +@api.get("/online-devices", summary="Online devices list", response_model=Resp) +def api_get_online_devices(): + """获取在线设备列表 + + 返回当前在线的设备列表,包含设备ID、命名空间、机器名等信息 + """ + isok, data = get_online_devices() + if not isok: + return Resp(code=RespCode.ErrorHostNotInit, message=data.get("error", "Unknown error")) + + return Resp(data=data) + + +@api.get("/devices/{device_id}/actions", summary="Device actions list", response_model=Resp) +def api_get_device_actions(device_id: str): + """获取设备可用的动作列表 + + Args: + device_id: 设备ID + + 返回指定设备的所有可用动作,包含动作名称、类型、是否繁忙等信息 + """ + isok, data = get_device_actions(device_id) + if not isok: + return Resp(code=RespCode.ErrorInvalidReq, message=data.get("error", "Unknown error")) + + return Resp(data=data) + + +@api.get("/devices/{device_id}/actions/{action_name}/schema", summary="Action schema", response_model=Resp) +def api_get_action_schema(device_id: str, action_name: str): + """获取动作的Schema详情 + + Args: + device_id: 设备ID + action_name: 动作名称 + + 返回动作的参数Schema、默认值、类型等详细信息 + """ + isok, data = get_action_schema(device_id, action_name) + if not isok: + return Resp(code=RespCode.ErrorInvalidReq, message=data.get("error", "Unknown error")) + + return Resp(data=data) + + +@api.get("/actions", summary="All available actions", response_model=Resp) +def api_get_all_actions(): + """获取所有设备的可用动作 + + 返回所有已注册设备的动作列表,包含设备信息和各动作的状态 + """ + isok, data = get_all_available_actions() + if not isok: + return Resp(code=RespCode.ErrorHostNotInit, message=data.get("error", "Unknown error")) + + return Resp(data=data) + + @api.get("/job/{id}/status", summary="Job status", response_model=JobStatusResp) def job_status(id: str): """获取任务状态""" @@ -1244,11 +1312,22 @@ def job_status(id: str): @api.post("/job/add", summary="Create job", response_model=JobAddResp) def post_job_add(req: JobAddReq): """创建任务""" - device_id = req.device_id - if not req.data: - return Resp(code=RespCode.ErrorInvalidReq, message="Invalid request data") + # 检查必要参数:device_id 和 action + if not req.device_id: + return JobAddResp( + data=JobData(jobId="", status=6), + code=RespCode.ErrorInvalidReq, + message="device_id is required", + ) + + action_name = req.data.get("action", req.action) if req.data else req.action + if not action_name: + return JobAddResp( + data=JobData(jobId="", status=6), + code=RespCode.ErrorInvalidReq, + message="action is required", + ) - req.device_id = device_id data = job_add(req) return JobAddResp(data=data) diff --git a/unilabos/app/web/controler.py b/unilabos/app/web/controler.py deleted file mode 100644 index d23470fe..00000000 --- a/unilabos/app/web/controler.py +++ /dev/null @@ -1,45 +0,0 @@ - -import json -import traceback -import uuid -from unilabos.app.model import JobAddReq, JobData -from unilabos.ros.nodes.presets.host_node import HostNode -from unilabos.utils.type_check import serialize_result_info - - -def get_resources() -> tuple: - if HostNode.get_instance() is None: - return False, "Host node not initialized" - - return True, HostNode.get_instance().resources_config - -def devices() -> tuple: - if HostNode.get_instance() is None: - return False, "Host node not initialized" - - return True, HostNode.get_instance().devices_config - -def job_info(id: str): - get_goal_status = HostNode.get_instance().get_goal_status(id) - return JobData(jobId=id, status=get_goal_status) - -def job_add(req: JobAddReq) -> JobData: - if req.job_id is None: - req.job_id = str(uuid.uuid4()) - action_name = req.data["action"] - action_type = req.data.get("action_type", "LocalUnknown") - action_args = req.data.get("action_kwargs", None) # 兼容老版本,后续删除 - if action_args is None: - action_args = req.data.get("action_args") - else: - if "command" in action_args: - action_args = action_args["command"] - # print(f"job_add:{req.device_id} {action_name} {action_kwargs}") - try: - HostNode.get_instance().send_goal(req.device_id, action_type=action_type, action_name=action_name, action_kwargs=action_args, goal_uuid=req.job_id, server_info=req.server_info) - except Exception as e: - for bridge in HostNode.get_instance().bridges: - traceback.print_exc() - if hasattr(bridge, "publish_job_status"): - bridge.publish_job_status({}, req.job_id, "failed", serialize_result_info(traceback.format_exc(), False, {})) - return JobData(jobId=req.job_id) diff --git a/unilabos/app/web/controller.py b/unilabos/app/web/controller.py new file mode 100644 index 00000000..9b0f1ff6 --- /dev/null +++ b/unilabos/app/web/controller.py @@ -0,0 +1,587 @@ +""" +Web API Controller + +提供Web API的控制器函数,处理设备、任务和动作相关的业务逻辑 +""" + +import threading +import time +import traceback +import uuid +from dataclasses import dataclass, field +from typing import Optional, Dict, Any, Tuple + +from unilabos.app.model import JobAddReq, JobData +from unilabos.ros.nodes.presets.host_node import HostNode +from unilabos.utils import logger + + +@dataclass +class JobResult: + """任务结果数据""" + + job_id: str + status: int # 4:SUCCEEDED, 5:CANCELED, 6:ABORTED + result: Dict[str, Any] = field(default_factory=dict) + feedback: Dict[str, Any] = field(default_factory=dict) + timestamp: float = field(default_factory=time.time) + + +class JobResultStore: + """任务结果存储(单例)""" + + _instance: Optional["JobResultStore"] = None + _lock = threading.Lock() + + def __init__(self): + if not hasattr(self, "_initialized"): + self._results: Dict[str, JobResult] = {} + self._results_lock = threading.RLock() + self._initialized = True + + def __new__(cls): + if cls._instance is None: + with cls._lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def store_result( + self, job_id: str, status: int, result: Optional[Dict[str, Any]], feedback: Optional[Dict[str, Any]] = None + ): + """存储任务结果""" + with self._results_lock: + self._results[job_id] = JobResult( + job_id=job_id, + status=status, + result=result or {}, + feedback=feedback or {}, + timestamp=time.time(), + ) + logger.debug(f"[JobResultStore] Stored result for job {job_id[:8]}, status={status}") + + def get_and_remove(self, job_id: str) -> Optional[JobResult]: + """获取并删除任务结果""" + with self._results_lock: + result = self._results.pop(job_id, None) + if result: + logger.debug(f"[JobResultStore] Retrieved and removed result for job {job_id[:8]}") + return result + + def get_result(self, job_id: str) -> Optional[JobResult]: + """仅获取任务结果(不删除)""" + with self._results_lock: + return self._results.get(job_id) + + def cleanup_old_results(self, max_age_seconds: float = 3600): + """清理过期的结果""" + current_time = time.time() + with self._results_lock: + expired_jobs = [ + job_id for job_id, result in self._results.items() if current_time - result.timestamp > max_age_seconds + ] + for job_id in expired_jobs: + del self._results[job_id] + logger.debug(f"[JobResultStore] Cleaned up expired result for job {job_id[:8]}") + + +# 全局结果存储实例 +job_result_store = JobResultStore() + + +def store_job_result( + job_id: str, status: str, result: Optional[Dict[str, Any]], feedback: Optional[Dict[str, Any]] = None +): + """存储任务结果(供外部调用) + + Args: + job_id: 任务ID + status: 状态字符串 ("success", "failed", "cancelled") + result: 结果数据 + feedback: 反馈数据 + """ + # 转换状态字符串为整数 + status_map = { + "success": 4, # SUCCEEDED + "failed": 6, # ABORTED + "cancelled": 5, # CANCELED + "running": 2, # EXECUTING + } + status_int = status_map.get(status, 0) + + # 只存储最终状态 + if status_int in (4, 5, 6): + job_result_store.store_result(job_id, status_int, result, feedback) + + +def get_resources() -> Tuple[bool, Any]: + """获取资源配置 + + Returns: + Tuple[bool, Any]: (是否成功, 资源配置或错误信息) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, "Host node not initialized" + + return True, host_node.resources_config + + +def devices() -> Tuple[bool, Any]: + """获取设备配置 + + Returns: + Tuple[bool, Any]: (是否成功, 设备配置或错误信息) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, "Host node not initialized" + + return True, host_node.devices_config + + +def job_info(job_id: str, remove_after_read: bool = True) -> JobData: + """获取任务信息 + + Args: + job_id: 任务ID + remove_after_read: 是否在读取后删除结果(默认True) + + Returns: + JobData: 任务数据 + """ + # 首先检查结果存储中是否有已完成的结果 + if remove_after_read: + stored_result = job_result_store.get_and_remove(job_id) + else: + stored_result = job_result_store.get_result(job_id) + + if stored_result: + # 有存储的结果,直接返回 + return JobData( + jobId=job_id, + status=stored_result.status, + result=stored_result.result, + ) + + # 没有存储的结果,从 HostNode 获取当前状态 + host_node = HostNode.get_instance(0) + if host_node is None: + return JobData(jobId=job_id, status=0) + + get_goal_status = host_node.get_goal_status(job_id) + return JobData(jobId=job_id, status=get_goal_status) + + +def check_device_action_busy(device_id: str, action_name: str) -> Tuple[bool, Optional[str]]: + """检查设备动作是否正在执行(被占用) + + Args: + device_id: 设备ID + action_name: 动作名称 + + Returns: + Tuple[bool, Optional[str]]: (是否繁忙, 当前执行的job_id或None) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, None + + device_action_key = f"/devices/{device_id}/{action_name}" + + # 检查 _device_action_status 中是否有正在执行的任务 + if device_action_key in host_node._device_action_status: + status = host_node._device_action_status[device_action_key] + if status.job_ids: + # 返回第一个正在执行的job_id + current_job_id = next(iter(status.job_ids.keys()), None) + return True, current_job_id + + return False, None + + +def _get_action_type(device_id: str, action_name: str) -> Optional[str]: + """从注册表自动获取动作类型 + + Args: + device_id: 设备ID + action_name: 动作名称 + + Returns: + 动作类型字符串,未找到返回None + """ + try: + from unilabos.ros.nodes.base_device_node import registered_devices + + # 方法1: 从运行时注册设备获取 + if device_id in registered_devices: + device_info = registered_devices[device_id] + base_node = device_info.get("base_node_instance") + if base_node and hasattr(base_node, "_action_value_mappings"): + action_mappings = base_node._action_value_mappings + # 尝试直接匹配或 auto- 前缀匹配 + for key in [action_name, f"auto-{action_name}"]: + if key in action_mappings: + action_type = action_mappings[key].get("type") + if action_type: + # 转换为字符串格式 + if hasattr(action_type, "__module__") and hasattr(action_type, "__name__"): + return f"{action_type.__module__}.{action_type.__name__}" + return str(action_type) + + # 方法2: 从lab_registry获取 + from unilabos.registry.registry import lab_registry + + host_node = HostNode.get_instance(0) + if host_node and lab_registry: + devices_config = host_node.devices_config + device_class = None + + for tree in devices_config.trees: + node = tree.root_node + if node.res_content.id == device_id: + device_class = node.res_content.klass + break + + if device_class and device_class in lab_registry.device_type_registry: + device_type_info = lab_registry.device_type_registry[device_class] + class_info = device_type_info.get("class", {}) + action_mappings = class_info.get("action_value_mappings", {}) + + for key in [action_name, f"auto-{action_name}"]: + if key in action_mappings: + action_type = action_mappings[key].get("type") + if action_type: + if hasattr(action_type, "__module__") and hasattr(action_type, "__name__"): + return f"{action_type.__module__}.{action_type.__name__}" + return str(action_type) + + except Exception as e: + logger.warning(f"[Controller] Failed to get action type for {device_id}/{action_name}: {str(e)}") + + return None + + +def job_add(req: JobAddReq) -> JobData: + """添加任务(检查设备是否繁忙,繁忙则返回失败) + + Args: + req: 任务添加请求 + + Returns: + JobData: 任务数据(包含状态) + """ + # 服务端自动生成 job_id 和 task_id + job_id = str(uuid.uuid4()) + task_id = str(uuid.uuid4()) + + # 服务端自动生成 server_info + server_info = {"send_timestamp": time.time()} + + host_node = HostNode.get_instance(0) + if host_node is None: + logger.error(f"[Controller] Host node not initialized for job: {job_id[:8]}") + return JobData(jobId=job_id, status=6) # 6 = ABORTED + + # 解析动作信息 + action_name = req.data.get("action", req.action) if req.data else req.action + action_args = req.data.get("action_kwargs") or req.data.get("action_args") if req.data else req.action_args + + if action_args is None: + action_args = req.action_args or {} + elif isinstance(action_args, dict) and "command" in action_args: + action_args = action_args["command"] + + # 自动获取 action_type + action_type = _get_action_type(req.device_id, action_name) + if action_type is None: + logger.error(f"[Controller] Action type not found for {req.device_id}/{action_name}") + return JobData(jobId=job_id, status=6) # ABORTED + + # 检查设备动作是否繁忙 + is_busy, current_job_id = check_device_action_busy(req.device_id, action_name) + + if is_busy: + logger.warning( + f"[Controller] Device action busy: {req.device_id}/{action_name}, " + f"current job: {current_job_id[:8] if current_job_id else 'unknown'}" + ) + # 返回失败状态,status=6 表示 ABORTED + return JobData(jobId=job_id, status=6) + + # 设备空闲,提交任务执行 + try: + from unilabos.app.ws_client import QueueItem + + device_action_key = f"/devices/{req.device_id}/{action_name}" + queue_item = QueueItem( + task_type="job_call_back_status", + device_id=req.device_id, + action_name=action_name, + task_id=task_id, + job_id=job_id, + device_action_key=device_action_key, + ) + + host_node.send_goal( + queue_item, + action_type=action_type, + action_kwargs=action_args, + server_info=server_info, + ) + + logger.info(f"[Controller] Job submitted: {job_id[:8]} -> {req.device_id}/{action_name}") + # 返回已接受状态,status=1 表示 ACCEPTED + return JobData(jobId=job_id, status=1) + + except ValueError as e: + # ActionClient not found 等错误 + logger.error(f"[Controller] Action not available: {str(e)}") + return JobData(jobId=job_id, status=6) # ABORTED + + except Exception as e: + logger.error(f"[Controller] Error submitting job: {str(e)}") + traceback.print_exc() + return JobData(jobId=job_id, status=6) # ABORTED + + +def get_online_devices() -> Tuple[bool, Dict[str, Any]]: + """获取在线设备列表 + + Returns: + Tuple[bool, Dict]: (是否成功, 在线设备信息) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, {"error": "Host node not initialized"} + + try: + from unilabos.ros.nodes.base_device_node import registered_devices + + online_devices = {} + for device_key in host_node._online_devices: + # device_key 格式: "namespace/device_id" + parts = device_key.split("/") + if len(parts) >= 2: + device_id = parts[-1] + else: + device_id = device_key + + # 获取设备详细信息 + device_info = registered_devices.get(device_id, {}) + machine_name = host_node.device_machine_names.get(device_id, "未知") + + online_devices[device_id] = { + "device_key": device_key, + "namespace": host_node.devices_names.get(device_id, ""), + "machine_name": machine_name, + "uuid": device_info.get("uuid", "") if device_info else "", + "node_name": device_info.get("node_name", "") if device_info else "", + } + + return True, { + "online_devices": online_devices, + "total_count": len(online_devices), + "timestamp": time.time(), + } + + except Exception as e: + logger.error(f"[Controller] Error getting online devices: {str(e)}") + traceback.print_exc() + return False, {"error": str(e)} + + +def get_device_actions(device_id: str) -> Tuple[bool, Dict[str, Any]]: + """获取设备可用的动作列表 + + Args: + device_id: 设备ID + + Returns: + Tuple[bool, Dict]: (是否成功, 动作列表信息) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, {"error": "Host node not initialized"} + + try: + from unilabos.ros.nodes.base_device_node import registered_devices + from unilabos.app.web.utils.action_utils import get_action_info + + # 检查设备是否已注册 + if device_id not in registered_devices: + return False, {"error": f"Device not found: {device_id}"} + + device_info = registered_devices[device_id] + actions = device_info.get("actions", {}) + + actions_list = {} + for action_name, action_server in actions.items(): + try: + action_info = get_action_info(action_server, action_name) + # 检查动作是否繁忙 + is_busy, current_job = check_device_action_busy(device_id, action_name) + actions_list[action_name] = { + **action_info, + "is_busy": is_busy, + "current_job_id": current_job[:8] if current_job else None, + } + except Exception as e: + logger.warning(f"[Controller] Error getting action info for {action_name}: {str(e)}") + actions_list[action_name] = { + "type_name": "unknown", + "action_path": f"/devices/{device_id}/{action_name}", + "is_busy": False, + "error": str(e), + } + + return True, { + "device_id": device_id, + "actions": actions_list, + "action_count": len(actions_list), + } + + except Exception as e: + logger.error(f"[Controller] Error getting device actions: {str(e)}") + traceback.print_exc() + return False, {"error": str(e)} + + +def get_action_schema(device_id: str, action_name: str) -> Tuple[bool, Dict[str, Any]]: + """获取动作的Schema详情 + + Args: + device_id: 设备ID + action_name: 动作名称 + + Returns: + Tuple[bool, Dict]: (是否成功, Schema信息) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, {"error": "Host node not initialized"} + + try: + from unilabos.registry.registry import lab_registry + from unilabos.ros.nodes.base_device_node import registered_devices + + result = { + "device_id": device_id, + "action_name": action_name, + "schema": None, + "goal_default": None, + "action_type": None, + "is_busy": False, + } + + # 检查动作是否繁忙 + is_busy, current_job = check_device_action_busy(device_id, action_name) + result["is_busy"] = is_busy + result["current_job_id"] = current_job[:8] if current_job else None + + # 方法1: 从 registered_devices 获取运行时信息 + if device_id in registered_devices: + device_info = registered_devices[device_id] + base_node = device_info.get("base_node_instance") + + if base_node and hasattr(base_node, "_action_value_mappings"): + action_mappings = base_node._action_value_mappings + if action_name in action_mappings: + mapping = action_mappings[action_name] + result["schema"] = mapping.get("schema") + result["goal_default"] = mapping.get("goal_default") + result["action_type"] = str(mapping.get("type", "")) + + # 方法2: 从 lab_registry 获取注册表信息(如果运行时没有) + if result["schema"] is None and lab_registry: + # 尝试查找设备类型 + devices_config = host_node.devices_config + device_class = None + + # 从配置中获取设备类型 + for tree in devices_config.trees: + node = tree.root_node + if node.res_content.id == device_id: + device_class = node.res_content.klass + break + + if device_class and device_class in lab_registry.device_type_registry: + device_type_info = lab_registry.device_type_registry[device_class] + class_info = device_type_info.get("class", {}) + action_mappings = class_info.get("action_value_mappings", {}) + + # 尝试直接匹配或 auto- 前缀匹配 + for key in [action_name, f"auto-{action_name}"]: + if key in action_mappings: + mapping = action_mappings[key] + result["schema"] = mapping.get("schema") + result["goal_default"] = mapping.get("goal_default") + result["action_type"] = str(mapping.get("type", "")) + result["handles"] = mapping.get("handles", {}) + result["placeholder_keys"] = mapping.get("placeholder_keys", {}) + break + + if result["schema"] is None: + return False, {"error": f"Action schema not found: {device_id}/{action_name}"} + + return True, result + + except Exception as e: + logger.error(f"[Controller] Error getting action schema: {str(e)}") + traceback.print_exc() + return False, {"error": str(e)} + + +def get_all_available_actions() -> Tuple[bool, Dict[str, Any]]: + """获取所有设备的可用动作 + + Returns: + Tuple[bool, Dict]: (是否成功, 所有设备的动作信息) + """ + host_node = HostNode.get_instance(0) + if host_node is None: + return False, {"error": "Host node not initialized"} + + try: + from unilabos.ros.nodes.base_device_node import registered_devices + from unilabos.app.web.utils.action_utils import get_action_info + + all_actions = {} + total_action_count = 0 + + for device_id, device_info in registered_devices.items(): + actions = device_info.get("actions", {}) + device_actions = {} + + for action_name, action_server in actions.items(): + try: + action_info = get_action_info(action_server, action_name) + is_busy, current_job = check_device_action_busy(device_id, action_name) + device_actions[action_name] = { + "type_name": action_info.get("type_name", ""), + "action_path": action_info.get("action_path", ""), + "is_busy": is_busy, + "current_job_id": current_job[:8] if current_job else None, + } + total_action_count += 1 + except Exception as e: + logger.warning(f"[Controller] Error processing action {device_id}/{action_name}: {str(e)}") + + if device_actions: + all_actions[device_id] = { + "actions": device_actions, + "action_count": len(device_actions), + "machine_name": host_node.device_machine_names.get(device_id, "未知"), + } + + return True, { + "devices": all_actions, + "device_count": len(all_actions), + "total_action_count": total_action_count, + "timestamp": time.time(), + } + + except Exception as e: + logger.error(f"[Controller] Error getting all available actions: {str(e)}") + traceback.print_exc() + return False, {"error": str(e)} diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index ac9930f6..18919d78 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -791,6 +791,16 @@ class HostNode(BaseROS2DeviceNode): del self._goals[job_id] self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals") + # 存储结果供 HTTP API 查询 + try: + from unilabos.app.web.controller import store_job_result + if goal_status == GoalStatus.STATUS_CANCELED: + store_job_result(job_id, status, return_info, {}) + else: + store_job_result(job_id, status, return_info, result_data) + except ImportError: + pass # controller 模块可能未加载 + # 发布状态到桥接器 if job_id: for bridge in self.bridges: