Add backend api and update doc

This commit is contained in:
Xuwznln
2025-11-26 19:03:31 +08:00
parent f1ad0c9c96
commit ffc583e9d5
7 changed files with 1049 additions and 66 deletions

View File

@@ -0,0 +1,334 @@
# HTTP API 指南
本文档介绍如何通过 HTTP API 与 Uni-Lab-OS 进行交互,包括查询设备、提交任务和获取结果。
## 概述
Uni-Lab-OS 提供 RESTful HTTP API允许外部系统通过标准 HTTP 请求控制实验室设备。API 基于 FastAPI 构建,默认运行在 `http://localhost:8002`
### 基础信息
- **Base URL**: `http://localhost:8002/api/v1`
- **Content-Type**: `application/json`
- **响应格式**: JSON
### 通用响应结构
```json
{
"code": 0,
"data": { ... },
"message": "success"
}
```
| 字段 | 类型 | 说明 |
| --------- | ------ | ------------------ |
| `code` | int | 状态码0 表示成功 |
| `data` | object | 响应数据 |
| `message` | string | 响应消息 |
## 快速开始
以下是一个完整的工作流示例:查询设备 → 获取动作 → 提交任务 → 获取结果。
### 步骤 1: 获取在线设备
```bash
curl -X GET "http://localhost:8002/api/v1/online-devices"
```
**响应示例**:
```json
{
"code": 0,
"data": {
"online_devices": {
"host_node": {
"device_key": "/host_node",
"namespace": "",
"machine_name": "本地",
"uuid": "xxx-xxx-xxx",
"node_name": "host_node"
}
},
"total_count": 1,
"timestamp": 1732612345.123
},
"message": "success"
}
```
### 步骤 2: 获取设备可用动作
```bash
curl -X GET "http://localhost:8002/api/v1/devices/host_node/actions"
```
**响应示例**:
```json
{
"code": 0,
"data": {
"device_id": "host_node",
"actions": {
"test_latency": {
"type_name": "unilabos_msgs.action._empty_in.EmptyIn",
"type_name_convert": "unilabos_msgs/action/_empty_in/EmptyIn",
"action_path": "/devices/host_node/test_latency",
"goal_info": "{}",
"is_busy": false,
"current_job_id": null
},
"create_resource": {
"type_name": "unilabos_msgs.action._resource_create_from_outer_easy.ResourceCreateFromOuterEasy",
"action_path": "/devices/host_node/create_resource",
"goal_info": "{res_id: '', device_id: '', class_name: '', ...}",
"is_busy": false,
"current_job_id": null
}
},
"action_count": 5
},
"message": "success"
}
```
**动作状态字段说明**:
| 字段 | 说明 |
| ---------------- | ----------------------------- |
| `type_name` | 动作类型的完整名称 |
| `action_path` | ROS2 动作路径 |
| `goal_info` | 动作参数模板 |
| `is_busy` | 动作是否正在执行 |
| `current_job_id` | 当前执行的任务 ID如果繁忙 |
### 步骤 3: 提交任务
```bash
curl -X POST "http://localhost:8002/api/v1/job/add" \
-H "Content-Type: application/json" \
-d '{"device_id":"host_node","action":"test_latency","action_args":{}}'
```
**请求体**:
```json
{
"device_id": "host_node",
"action": "test_latency",
"action_args": {}
}
```
**请求参数说明**:
| 字段 | 类型 | 必填 | 说明 |
| ------------- | ------ | ---- | ---------------------------------- |
| `device_id` | string | ✓ | 目标设备 ID |
| `action` | string | ✓ | 动作名称 |
| `action_args` | object | ✓ | 动作参数(根据动作类型不同而变化) |
**响应示例**:
```json
{
"code": 0,
"data": {
"jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd",
"status": 1,
"result": {}
},
"message": "success"
}
```
**任务状态码**:
| 状态码 | 含义 | 说明 |
| ------ | --------- | ------------------------------ |
| 0 | UNKNOWN | 未知状态 |
| 1 | ACCEPTED | 任务已接受,等待执行 |
| 2 | EXECUTING | 任务执行中 |
| 3 | CANCELING | 任务取消中 |
| 4 | SUCCEEDED | 任务成功完成 |
| 5 | CANCELED | 任务已取消 |
| 6 | ABORTED | 任务中止(设备繁忙或执行失败) |
### 步骤 4: 查询任务状态和结果
```bash
curl -X GET "http://localhost:8002/api/v1/job/b6acb586-733a-42ab-9f73-55c9a52aa8bd/status"
```
**响应示例(执行中)**:
```json
{
"code": 0,
"data": {
"jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd",
"status": 2,
"result": {}
},
"message": "success"
}
```
**响应示例(执行完成)**:
```json
{
"code": 0,
"data": {
"jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd",
"status": 4,
"result": {
"error": "",
"suc": true,
"return_value": {
"avg_rtt_ms": 103.99,
"avg_time_diff_ms": 7181.55,
"max_time_error_ms": 7210.57,
"task_delay_ms": -1,
"raw_delay_ms": 33.19,
"test_count": 5,
"status": "success"
}
}
},
"message": "success"
}
```
> **注意**: 任务结果在首次查询后会被自动删除,请确保保存返回的结果数据。
## API 端点列表
### 设备相关
| 端点 | 方法 | 说明 |
| ---------------------------------------------------------- | ---- | ---------------------- |
| `/api/v1/online-devices` | GET | 获取在线设备列表 |
| `/api/v1/devices` | GET | 获取设备配置 |
| `/api/v1/devices/{device_id}/actions` | GET | 获取指定设备的可用动作 |
| `/api/v1/devices/{device_id}/actions/{action_name}/schema` | GET | 获取动作参数 Schema |
| `/api/v1/actions` | GET | 获取所有设备的可用动作 |
### 任务相关
| 端点 | 方法 | 说明 |
| ----------------------------- | ---- | ------------------ |
| `/api/v1/job/add` | POST | 提交新任务 |
| `/api/v1/job/{job_id}/status` | GET | 查询任务状态和结果 |
### 资源相关
| 端点 | 方法 | 说明 |
| ------------------- | ---- | ------------ |
| `/api/v1/resources` | GET | 获取资源列表 |
## 常见动作示例
### test_latency - 延迟测试
测试系统延迟,无需参数。
```bash
curl -X POST "http://localhost:8002/api/v1/job/add" \
-H "Content-Type: application/json" \
-d '{"device_id":"host_node","action":"test_latency","action_args":{}}'
```
### create_resource - 创建资源
在设备上创建新资源。
```bash
curl -X POST "http://localhost:8002/api/v1/job/add" \
-H "Content-Type: application/json" \
-d '{
"device_id": "host_node",
"action": "create_resource",
"action_args": {
"res_id": "my_plate",
"device_id": "host_node",
"class_name": "Plate",
"parent": "deck",
"bind_locations": {"x": 0, "y": 0, "z": 0}
}
}'
```
## 错误处理
### 设备繁忙
当设备正在执行其他任务时,提交新任务会返回 `status: 6`ABORTED
```json
{
"code": 0,
"data": {
"jobId": "xxx",
"status": 6,
"result": {}
},
"message": "success"
}
```
此时应等待当前任务完成后重试,或使用 `/devices/{device_id}/actions` 检查动作的 `is_busy` 状态。
### 参数错误
```json
{
"code": 2002,
"data": { ... },
"message": "device_id is required"
}
```
## 轮询策略
推荐的任务状态轮询策略:
```python
import requests
import time
def wait_for_job(job_id, timeout=60, interval=0.5):
"""等待任务完成并返回结果"""
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(f"http://localhost:8002/api/v1/job/{job_id}/status")
data = response.json()["data"]
status = data["status"]
if status in (4, 5, 6): # SUCCEEDED, CANCELED, ABORTED
return data
time.sleep(interval)
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
# 使用示例
response = requests.post(
"http://localhost:8002/api/v1/job/add",
json={"device_id": "host_node", "action": "test_latency", "action_args": {}}
)
job_id = response.json()["data"]["jobId"]
result = wait_for_job(job_id)
print(result)
```
## 相关文档
- [设备注册指南](add_device.md)
- [动作定义指南](add_action.md)
- [网络架构概述](networking_overview.md)

View File

@@ -7,3 +7,17 @@ Uni-Lab-OS 是一个开源的实验室自动化操作系统,提供统一的设
intro.md
```
## 开发者指南
```{toctree}
:maxdepth: 2
developer_guide/http_api.md
developer_guide/networking_overview.md
developer_guide/add_device.md
developer_guide/add_action.md
developer_guide/add_registry.md
developer_guide/add_yaml.md
developer_guide/action_includes.md
```

View File

@@ -51,21 +51,25 @@ class Resp(BaseModel):
class JobAddReq(BaseModel):
device_id: str = Field(examples=["Gripper"], description="device id")
action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="")
action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="")
action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="")
task_id: str = Field(examples=["task_id"], description="task uuid")
job_id: str = Field(examples=["job_id"], description="goal uuid")
node_id: str = Field(examples=["node_id"], description="node uuid")
server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info")
action_type: str = Field(
examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default=""
)
action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict)
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")
node_id: str = Field(examples=["node_id"], description="node uuid", default="")
server_info: dict = Field(
examples=[{"send_timestamp": 1717000000.0}],
description="server info (auto-generated if empty)",
default_factory=dict,
)
data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default={})
data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default_factory=dict)
class JobStepFinishReq(BaseModel):
token: str = Field(examples=["030944"], description="token")
request_time: str = Field(
examples=["2024-12-12 12:12:12.xxx"], description="requestTime"
)
request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime")
data: dict = Field(
examples=[
{
@@ -83,9 +87,7 @@ class JobStepFinishReq(BaseModel):
class JobPreintakeFinishReq(BaseModel):
token: str = Field(examples=["030944"], description="token")
request_time: str = Field(
examples=["2024-12-12 12:12:12.xxx"], description="requestTime"
)
request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime")
data: dict = Field(
examples=[
{
@@ -102,9 +104,7 @@ class JobPreintakeFinishReq(BaseModel):
class JobFinishReq(BaseModel):
token: str = Field(examples=["030944"], description="token")
request_time: str = Field(
examples=["2024-12-12 12:12:12.xxx"], description="requestTime"
)
request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime")
data: dict = Field(
examples=[
{
@@ -133,6 +133,10 @@ class JobData(BaseModel):
default=0,
description="0:UNKNOWN, 1:ACCEPTED, 2:EXECUTING, 3:CANCELING, 4:SUCCEEDED, 5:CANCELED, 6:ABORTED",
)
result: dict = Field(
default_factory=dict,
description="Job result data (available when status is SUCCEEDED/CANCELED/ABORTED)",
)
class JobStatusResp(Resp):

View File

@@ -9,13 +9,22 @@ import asyncio
import yaml
from unilabos.app.web.controler import devices, job_add, job_info
from unilabos.app.web.controller import (
devices,
job_add,
job_info,
get_online_devices,
get_device_actions,
get_action_schema,
get_all_available_actions,
)
from unilabos.app.model import (
Resp,
RespCode,
JobStatusResp,
JobAddResp,
JobAddReq,
JobData,
)
from unilabos.app.web.utils.host_utils import get_host_node_info
from unilabos.registry.registry import lab_registry
@@ -1234,6 +1243,65 @@ def get_devices():
return Resp(data=dict(data))
@api.get("/online-devices", summary="Online devices list", response_model=Resp)
def api_get_online_devices():
"""获取在线设备列表
返回当前在线的设备列表包含设备ID、命名空间、机器名等信息
"""
isok, data = get_online_devices()
if not isok:
return Resp(code=RespCode.ErrorHostNotInit, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/devices/{device_id}/actions", summary="Device actions list", response_model=Resp)
def api_get_device_actions(device_id: str):
"""获取设备可用的动作列表
Args:
device_id: 设备ID
返回指定设备的所有可用动作,包含动作名称、类型、是否繁忙等信息
"""
isok, data = get_device_actions(device_id)
if not isok:
return Resp(code=RespCode.ErrorInvalidReq, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/devices/{device_id}/actions/{action_name}/schema", summary="Action schema", response_model=Resp)
def api_get_action_schema(device_id: str, action_name: str):
"""获取动作的Schema详情
Args:
device_id: 设备ID
action_name: 动作名称
返回动作的参数Schema、默认值、类型等详细信息
"""
isok, data = get_action_schema(device_id, action_name)
if not isok:
return Resp(code=RespCode.ErrorInvalidReq, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/actions", summary="All available actions", response_model=Resp)
def api_get_all_actions():
"""获取所有设备的可用动作
返回所有已注册设备的动作列表,包含设备信息和各动作的状态
"""
isok, data = get_all_available_actions()
if not isok:
return Resp(code=RespCode.ErrorHostNotInit, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/job/{id}/status", summary="Job status", response_model=JobStatusResp)
def job_status(id: str):
"""获取任务状态"""
@@ -1244,11 +1312,22 @@ def job_status(id: str):
@api.post("/job/add", summary="Create job", response_model=JobAddResp)
def post_job_add(req: JobAddReq):
"""创建任务"""
device_id = req.device_id
if not req.data:
return Resp(code=RespCode.ErrorInvalidReq, message="Invalid request data")
# 检查必要参数device_id 和 action
if not req.device_id:
return JobAddResp(
data=JobData(jobId="", status=6),
code=RespCode.ErrorInvalidReq,
message="device_id is required",
)
action_name = req.data.get("action", req.action) if req.data else req.action
if not action_name:
return JobAddResp(
data=JobData(jobId="", status=6),
code=RespCode.ErrorInvalidReq,
message="action is required",
)
req.device_id = device_id
data = job_add(req)
return JobAddResp(data=data)

View File

@@ -1,45 +0,0 @@
import json
import traceback
import uuid
from unilabos.app.model import JobAddReq, JobData
from unilabos.ros.nodes.presets.host_node import HostNode
from unilabos.utils.type_check import serialize_result_info
def get_resources() -> tuple:
if HostNode.get_instance() is None:
return False, "Host node not initialized"
return True, HostNode.get_instance().resources_config
def devices() -> tuple:
if HostNode.get_instance() is None:
return False, "Host node not initialized"
return True, HostNode.get_instance().devices_config
def job_info(id: str):
get_goal_status = HostNode.get_instance().get_goal_status(id)
return JobData(jobId=id, status=get_goal_status)
def job_add(req: JobAddReq) -> JobData:
if req.job_id is None:
req.job_id = str(uuid.uuid4())
action_name = req.data["action"]
action_type = req.data.get("action_type", "LocalUnknown")
action_args = req.data.get("action_kwargs", None) # 兼容老版本,后续删除
if action_args is None:
action_args = req.data.get("action_args")
else:
if "command" in action_args:
action_args = action_args["command"]
# print(f"job_add:{req.device_id} {action_name} {action_kwargs}")
try:
HostNode.get_instance().send_goal(req.device_id, action_type=action_type, action_name=action_name, action_kwargs=action_args, goal_uuid=req.job_id, server_info=req.server_info)
except Exception as e:
for bridge in HostNode.get_instance().bridges:
traceback.print_exc()
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status({}, req.job_id, "failed", serialize_result_info(traceback.format_exc(), False, {}))
return JobData(jobId=req.job_id)

View File

@@ -0,0 +1,587 @@
"""
Web API Controller
提供Web API的控制器函数处理设备、任务和动作相关的业务逻辑
"""
import threading
import time
import traceback
import uuid
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, Tuple
from unilabos.app.model import JobAddReq, JobData
from unilabos.ros.nodes.presets.host_node import HostNode
from unilabos.utils import logger
@dataclass
class JobResult:
"""任务结果数据"""
job_id: str
status: int # 4:SUCCEEDED, 5:CANCELED, 6:ABORTED
result: Dict[str, Any] = field(default_factory=dict)
feedback: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
class JobResultStore:
"""任务结果存储(单例)"""
_instance: Optional["JobResultStore"] = None
_lock = threading.Lock()
def __init__(self):
if not hasattr(self, "_initialized"):
self._results: Dict[str, JobResult] = {}
self._results_lock = threading.RLock()
self._initialized = True
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def store_result(
self, job_id: str, status: int, result: Optional[Dict[str, Any]], feedback: Optional[Dict[str, Any]] = None
):
"""存储任务结果"""
with self._results_lock:
self._results[job_id] = JobResult(
job_id=job_id,
status=status,
result=result or {},
feedback=feedback or {},
timestamp=time.time(),
)
logger.debug(f"[JobResultStore] Stored result for job {job_id[:8]}, status={status}")
def get_and_remove(self, job_id: str) -> Optional[JobResult]:
"""获取并删除任务结果"""
with self._results_lock:
result = self._results.pop(job_id, None)
if result:
logger.debug(f"[JobResultStore] Retrieved and removed result for job {job_id[:8]}")
return result
def get_result(self, job_id: str) -> Optional[JobResult]:
"""仅获取任务结果(不删除)"""
with self._results_lock:
return self._results.get(job_id)
def cleanup_old_results(self, max_age_seconds: float = 3600):
"""清理过期的结果"""
current_time = time.time()
with self._results_lock:
expired_jobs = [
job_id for job_id, result in self._results.items() if current_time - result.timestamp > max_age_seconds
]
for job_id in expired_jobs:
del self._results[job_id]
logger.debug(f"[JobResultStore] Cleaned up expired result for job {job_id[:8]}")
# 全局结果存储实例
job_result_store = JobResultStore()
def store_job_result(
job_id: str, status: str, result: Optional[Dict[str, Any]], feedback: Optional[Dict[str, Any]] = None
):
"""存储任务结果(供外部调用)
Args:
job_id: 任务ID
status: 状态字符串 ("success", "failed", "cancelled")
result: 结果数据
feedback: 反馈数据
"""
# 转换状态字符串为整数
status_map = {
"success": 4, # SUCCEEDED
"failed": 6, # ABORTED
"cancelled": 5, # CANCELED
"running": 2, # EXECUTING
}
status_int = status_map.get(status, 0)
# 只存储最终状态
if status_int in (4, 5, 6):
job_result_store.store_result(job_id, status_int, result, feedback)
def get_resources() -> Tuple[bool, Any]:
"""获取资源配置
Returns:
Tuple[bool, Any]: (是否成功, 资源配置或错误信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, "Host node not initialized"
return True, host_node.resources_config
def devices() -> Tuple[bool, Any]:
"""获取设备配置
Returns:
Tuple[bool, Any]: (是否成功, 设备配置或错误信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, "Host node not initialized"
return True, host_node.devices_config
def job_info(job_id: str, remove_after_read: bool = True) -> JobData:
"""获取任务信息
Args:
job_id: 任务ID
remove_after_read: 是否在读取后删除结果默认True
Returns:
JobData: 任务数据
"""
# 首先检查结果存储中是否有已完成的结果
if remove_after_read:
stored_result = job_result_store.get_and_remove(job_id)
else:
stored_result = job_result_store.get_result(job_id)
if stored_result:
# 有存储的结果,直接返回
return JobData(
jobId=job_id,
status=stored_result.status,
result=stored_result.result,
)
# 没有存储的结果,从 HostNode 获取当前状态
host_node = HostNode.get_instance(0)
if host_node is None:
return JobData(jobId=job_id, status=0)
get_goal_status = host_node.get_goal_status(job_id)
return JobData(jobId=job_id, status=get_goal_status)
def check_device_action_busy(device_id: str, action_name: str) -> Tuple[bool, Optional[str]]:
"""检查设备动作是否正在执行(被占用)
Args:
device_id: 设备ID
action_name: 动作名称
Returns:
Tuple[bool, Optional[str]]: (是否繁忙, 当前执行的job_id或None)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, None
device_action_key = f"/devices/{device_id}/{action_name}"
# 检查 _device_action_status 中是否有正在执行的任务
if device_action_key in host_node._device_action_status:
status = host_node._device_action_status[device_action_key]
if status.job_ids:
# 返回第一个正在执行的job_id
current_job_id = next(iter(status.job_ids.keys()), None)
return True, current_job_id
return False, None
def _get_action_type(device_id: str, action_name: str) -> Optional[str]:
"""从注册表自动获取动作类型
Args:
device_id: 设备ID
action_name: 动作名称
Returns:
动作类型字符串未找到返回None
"""
try:
from unilabos.ros.nodes.base_device_node import registered_devices
# 方法1: 从运行时注册设备获取
if device_id in registered_devices:
device_info = registered_devices[device_id]
base_node = device_info.get("base_node_instance")
if base_node and hasattr(base_node, "_action_value_mappings"):
action_mappings = base_node._action_value_mappings
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
action_type = action_mappings[key].get("type")
if action_type:
# 转换为字符串格式
if hasattr(action_type, "__module__") and hasattr(action_type, "__name__"):
return f"{action_type.__module__}.{action_type.__name__}"
return str(action_type)
# 方法2: 从lab_registry获取
from unilabos.registry.registry import lab_registry
host_node = HostNode.get_instance(0)
if host_node and lab_registry:
devices_config = host_node.devices_config
device_class = None
for tree in devices_config.trees:
node = tree.root_node
if node.res_content.id == device_id:
device_class = node.res_content.klass
break
if device_class and device_class in lab_registry.device_type_registry:
device_type_info = lab_registry.device_type_registry[device_class]
class_info = device_type_info.get("class", {})
action_mappings = class_info.get("action_value_mappings", {})
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
action_type = action_mappings[key].get("type")
if action_type:
if hasattr(action_type, "__module__") and hasattr(action_type, "__name__"):
return f"{action_type.__module__}.{action_type.__name__}"
return str(action_type)
except Exception as e:
logger.warning(f"[Controller] Failed to get action type for {device_id}/{action_name}: {str(e)}")
return None
def job_add(req: JobAddReq) -> JobData:
"""添加任务(检查设备是否繁忙,繁忙则返回失败)
Args:
req: 任务添加请求
Returns:
JobData: 任务数据(包含状态)
"""
# 服务端自动生成 job_id 和 task_id
job_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
# 服务端自动生成 server_info
server_info = {"send_timestamp": time.time()}
host_node = HostNode.get_instance(0)
if host_node is None:
logger.error(f"[Controller] Host node not initialized for job: {job_id[:8]}")
return JobData(jobId=job_id, status=6) # 6 = ABORTED
# 解析动作信息
action_name = req.data.get("action", req.action) if req.data else req.action
action_args = req.data.get("action_kwargs") or req.data.get("action_args") if req.data else req.action_args
if action_args is None:
action_args = req.action_args or {}
elif isinstance(action_args, dict) and "command" in action_args:
action_args = action_args["command"]
# 自动获取 action_type
action_type = _get_action_type(req.device_id, action_name)
if action_type is None:
logger.error(f"[Controller] Action type not found for {req.device_id}/{action_name}")
return JobData(jobId=job_id, status=6) # ABORTED
# 检查设备动作是否繁忙
is_busy, current_job_id = check_device_action_busy(req.device_id, action_name)
if is_busy:
logger.warning(
f"[Controller] Device action busy: {req.device_id}/{action_name}, "
f"current job: {current_job_id[:8] if current_job_id else 'unknown'}"
)
# 返回失败状态status=6 表示 ABORTED
return JobData(jobId=job_id, status=6)
# 设备空闲,提交任务执行
try:
from unilabos.app.ws_client import QueueItem
device_action_key = f"/devices/{req.device_id}/{action_name}"
queue_item = QueueItem(
task_type="job_call_back_status",
device_id=req.device_id,
action_name=action_name,
task_id=task_id,
job_id=job_id,
device_action_key=device_action_key,
)
host_node.send_goal(
queue_item,
action_type=action_type,
action_kwargs=action_args,
server_info=server_info,
)
logger.info(f"[Controller] Job submitted: {job_id[:8]} -> {req.device_id}/{action_name}")
# 返回已接受状态status=1 表示 ACCEPTED
return JobData(jobId=job_id, status=1)
except ValueError as e:
# ActionClient not found 等错误
logger.error(f"[Controller] Action not available: {str(e)}")
return JobData(jobId=job_id, status=6) # ABORTED
except Exception as e:
logger.error(f"[Controller] Error submitting job: {str(e)}")
traceback.print_exc()
return JobData(jobId=job_id, status=6) # ABORTED
def get_online_devices() -> Tuple[bool, Dict[str, Any]]:
"""获取在线设备列表
Returns:
Tuple[bool, Dict]: (是否成功, 在线设备信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.ros.nodes.base_device_node import registered_devices
online_devices = {}
for device_key in host_node._online_devices:
# device_key 格式: "namespace/device_id"
parts = device_key.split("/")
if len(parts) >= 2:
device_id = parts[-1]
else:
device_id = device_key
# 获取设备详细信息
device_info = registered_devices.get(device_id, {})
machine_name = host_node.device_machine_names.get(device_id, "未知")
online_devices[device_id] = {
"device_key": device_key,
"namespace": host_node.devices_names.get(device_id, ""),
"machine_name": machine_name,
"uuid": device_info.get("uuid", "") if device_info else "",
"node_name": device_info.get("node_name", "") if device_info else "",
}
return True, {
"online_devices": online_devices,
"total_count": len(online_devices),
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"[Controller] Error getting online devices: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}
def get_device_actions(device_id: str) -> Tuple[bool, Dict[str, Any]]:
"""获取设备可用的动作列表
Args:
device_id: 设备ID
Returns:
Tuple[bool, Dict]: (是否成功, 动作列表信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.ros.nodes.base_device_node import registered_devices
from unilabos.app.web.utils.action_utils import get_action_info
# 检查设备是否已注册
if device_id not in registered_devices:
return False, {"error": f"Device not found: {device_id}"}
device_info = registered_devices[device_id]
actions = device_info.get("actions", {})
actions_list = {}
for action_name, action_server in actions.items():
try:
action_info = get_action_info(action_server, action_name)
# 检查动作是否繁忙
is_busy, current_job = check_device_action_busy(device_id, action_name)
actions_list[action_name] = {
**action_info,
"is_busy": is_busy,
"current_job_id": current_job[:8] if current_job else None,
}
except Exception as e:
logger.warning(f"[Controller] Error getting action info for {action_name}: {str(e)}")
actions_list[action_name] = {
"type_name": "unknown",
"action_path": f"/devices/{device_id}/{action_name}",
"is_busy": False,
"error": str(e),
}
return True, {
"device_id": device_id,
"actions": actions_list,
"action_count": len(actions_list),
}
except Exception as e:
logger.error(f"[Controller] Error getting device actions: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}
def get_action_schema(device_id: str, action_name: str) -> Tuple[bool, Dict[str, Any]]:
"""获取动作的Schema详情
Args:
device_id: 设备ID
action_name: 动作名称
Returns:
Tuple[bool, Dict]: (是否成功, Schema信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.registry.registry import lab_registry
from unilabos.ros.nodes.base_device_node import registered_devices
result = {
"device_id": device_id,
"action_name": action_name,
"schema": None,
"goal_default": None,
"action_type": None,
"is_busy": False,
}
# 检查动作是否繁忙
is_busy, current_job = check_device_action_busy(device_id, action_name)
result["is_busy"] = is_busy
result["current_job_id"] = current_job[:8] if current_job else None
# 方法1: 从 registered_devices 获取运行时信息
if device_id in registered_devices:
device_info = registered_devices[device_id]
base_node = device_info.get("base_node_instance")
if base_node and hasattr(base_node, "_action_value_mappings"):
action_mappings = base_node._action_value_mappings
if action_name in action_mappings:
mapping = action_mappings[action_name]
result["schema"] = mapping.get("schema")
result["goal_default"] = mapping.get("goal_default")
result["action_type"] = str(mapping.get("type", ""))
# 方法2: 从 lab_registry 获取注册表信息(如果运行时没有)
if result["schema"] is None and lab_registry:
# 尝试查找设备类型
devices_config = host_node.devices_config
device_class = None
# 从配置中获取设备类型
for tree in devices_config.trees:
node = tree.root_node
if node.res_content.id == device_id:
device_class = node.res_content.klass
break
if device_class and device_class in lab_registry.device_type_registry:
device_type_info = lab_registry.device_type_registry[device_class]
class_info = device_type_info.get("class", {})
action_mappings = class_info.get("action_value_mappings", {})
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
mapping = action_mappings[key]
result["schema"] = mapping.get("schema")
result["goal_default"] = mapping.get("goal_default")
result["action_type"] = str(mapping.get("type", ""))
result["handles"] = mapping.get("handles", {})
result["placeholder_keys"] = mapping.get("placeholder_keys", {})
break
if result["schema"] is None:
return False, {"error": f"Action schema not found: {device_id}/{action_name}"}
return True, result
except Exception as e:
logger.error(f"[Controller] Error getting action schema: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}
def get_all_available_actions() -> Tuple[bool, Dict[str, Any]]:
"""获取所有设备的可用动作
Returns:
Tuple[bool, Dict]: (是否成功, 所有设备的动作信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.ros.nodes.base_device_node import registered_devices
from unilabos.app.web.utils.action_utils import get_action_info
all_actions = {}
total_action_count = 0
for device_id, device_info in registered_devices.items():
actions = device_info.get("actions", {})
device_actions = {}
for action_name, action_server in actions.items():
try:
action_info = get_action_info(action_server, action_name)
is_busy, current_job = check_device_action_busy(device_id, action_name)
device_actions[action_name] = {
"type_name": action_info.get("type_name", ""),
"action_path": action_info.get("action_path", ""),
"is_busy": is_busy,
"current_job_id": current_job[:8] if current_job else None,
}
total_action_count += 1
except Exception as e:
logger.warning(f"[Controller] Error processing action {device_id}/{action_name}: {str(e)}")
if device_actions:
all_actions[device_id] = {
"actions": device_actions,
"action_count": len(device_actions),
"machine_name": host_node.device_machine_names.get(device_id, "未知"),
}
return True, {
"devices": all_actions,
"device_count": len(all_actions),
"total_action_count": total_action_count,
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"[Controller] Error getting all available actions: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}

View File

@@ -791,6 +791,16 @@ class HostNode(BaseROS2DeviceNode):
del self._goals[job_id]
self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals")
# 存储结果供 HTTP API 查询
try:
from unilabos.app.web.controller import store_job_result
if goal_status == GoalStatus.STATUS_CANCELED:
store_job_result(job_id, status, return_info, {})
else:
store_job_result(job_id, status, return_info, result_data)
except ImportError:
pass # controller 模块可能未加载
# 发布状态到桥接器
if job_id:
for bridge in self.bridges: