Merge remote-tracking branch 'upstream/dev' into hrdev

This commit is contained in:
ZiWei
2025-11-28 22:51:09 +08:00
26 changed files with 2544 additions and 381 deletions

View File

@@ -0,0 +1,334 @@
# HTTP API 指南
本文档介绍如何通过 HTTP API 与 Uni-Lab-OS 进行交互,包括查询设备、提交任务和获取结果。
## 概述
Uni-Lab-OS 提供 RESTful HTTP API允许外部系统通过标准 HTTP 请求控制实验室设备。API 基于 FastAPI 构建,默认运行在 `http://localhost:8002`
### 基础信息
- **Base URL**: `http://localhost:8002/api/v1`
- **Content-Type**: `application/json`
- **响应格式**: JSON
### 通用响应结构
```json
{
"code": 0,
"data": { ... },
"message": "success"
}
```
| 字段 | 类型 | 说明 |
| --------- | ------ | ------------------ |
| `code` | int | 状态码0 表示成功 |
| `data` | object | 响应数据 |
| `message` | string | 响应消息 |
## 快速开始
以下是一个完整的工作流示例:查询设备 → 获取动作 → 提交任务 → 获取结果。
### 步骤 1: 获取在线设备
```bash
curl -X GET "http://localhost:8002/api/v1/online-devices"
```
**响应示例**:
```json
{
"code": 0,
"data": {
"online_devices": {
"host_node": {
"device_key": "/host_node",
"namespace": "",
"machine_name": "本地",
"uuid": "xxx-xxx-xxx",
"node_name": "host_node"
}
},
"total_count": 1,
"timestamp": 1732612345.123
},
"message": "success"
}
```
### 步骤 2: 获取设备可用动作
```bash
curl -X GET "http://localhost:8002/api/v1/devices/host_node/actions"
```
**响应示例**:
```json
{
"code": 0,
"data": {
"device_id": "host_node",
"actions": {
"test_latency": {
"type_name": "unilabos_msgs.action._empty_in.EmptyIn",
"type_name_convert": "unilabos_msgs/action/_empty_in/EmptyIn",
"action_path": "/devices/host_node/test_latency",
"goal_info": "{}",
"is_busy": false,
"current_job_id": null
},
"create_resource": {
"type_name": "unilabos_msgs.action._resource_create_from_outer_easy.ResourceCreateFromOuterEasy",
"action_path": "/devices/host_node/create_resource",
"goal_info": "{res_id: '', device_id: '', class_name: '', ...}",
"is_busy": false,
"current_job_id": null
}
},
"action_count": 5
},
"message": "success"
}
```
**动作状态字段说明**:
| 字段 | 说明 |
| ---------------- | ----------------------------- |
| `type_name` | 动作类型的完整名称 |
| `action_path` | ROS2 动作路径 |
| `goal_info` | 动作参数模板 |
| `is_busy` | 动作是否正在执行 |
| `current_job_id` | 当前执行的任务 ID如果繁忙 |
### 步骤 3: 提交任务
```bash
curl -X POST "http://localhost:8002/api/v1/job/add" \
-H "Content-Type: application/json" \
-d '{"device_id":"host_node","action":"test_latency","action_args":{}}'
```
**请求体**:
```json
{
"device_id": "host_node",
"action": "test_latency",
"action_args": {}
}
```
**请求参数说明**:
| 字段 | 类型 | 必填 | 说明 |
| ------------- | ------ | ---- | ---------------------------------- |
| `device_id` | string | ✓ | 目标设备 ID |
| `action` | string | ✓ | 动作名称 |
| `action_args` | object | ✓ | 动作参数(根据动作类型不同而变化) |
**响应示例**:
```json
{
"code": 0,
"data": {
"jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd",
"status": 1,
"result": {}
},
"message": "success"
}
```
**任务状态码**:
| 状态码 | 含义 | 说明 |
| ------ | --------- | ------------------------------ |
| 0 | UNKNOWN | 未知状态 |
| 1 | ACCEPTED | 任务已接受,等待执行 |
| 2 | EXECUTING | 任务执行中 |
| 3 | CANCELING | 任务取消中 |
| 4 | SUCCEEDED | 任务成功完成 |
| 5 | CANCELED | 任务已取消 |
| 6 | ABORTED | 任务中止(设备繁忙或执行失败) |
### 步骤 4: 查询任务状态和结果
```bash
curl -X GET "http://localhost:8002/api/v1/job/b6acb586-733a-42ab-9f73-55c9a52aa8bd/status"
```
**响应示例(执行中)**:
```json
{
"code": 0,
"data": {
"jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd",
"status": 2,
"result": {}
},
"message": "success"
}
```
**响应示例(执行完成)**:
```json
{
"code": 0,
"data": {
"jobId": "b6acb586-733a-42ab-9f73-55c9a52aa8bd",
"status": 4,
"result": {
"error": "",
"suc": true,
"return_value": {
"avg_rtt_ms": 103.99,
"avg_time_diff_ms": 7181.55,
"max_time_error_ms": 7210.57,
"task_delay_ms": -1,
"raw_delay_ms": 33.19,
"test_count": 5,
"status": "success"
}
}
},
"message": "success"
}
```
> **注意**: 任务结果在首次查询后会被自动删除,请确保保存返回的结果数据。
## API 端点列表
### 设备相关
| 端点 | 方法 | 说明 |
| ---------------------------------------------------------- | ---- | ---------------------- |
| `/api/v1/online-devices` | GET | 获取在线设备列表 |
| `/api/v1/devices` | GET | 获取设备配置 |
| `/api/v1/devices/{device_id}/actions` | GET | 获取指定设备的可用动作 |
| `/api/v1/devices/{device_id}/actions/{action_name}/schema` | GET | 获取动作参数 Schema |
| `/api/v1/actions` | GET | 获取所有设备的可用动作 |
### 任务相关
| 端点 | 方法 | 说明 |
| ----------------------------- | ---- | ------------------ |
| `/api/v1/job/add` | POST | 提交新任务 |
| `/api/v1/job/{job_id}/status` | GET | 查询任务状态和结果 |
### 资源相关
| 端点 | 方法 | 说明 |
| ------------------- | ---- | ------------ |
| `/api/v1/resources` | GET | 获取资源列表 |
## 常见动作示例
### test_latency - 延迟测试
测试系统延迟,无需参数。
```bash
curl -X POST "http://localhost:8002/api/v1/job/add" \
-H "Content-Type: application/json" \
-d '{"device_id":"host_node","action":"test_latency","action_args":{}}'
```
### create_resource - 创建资源
在设备上创建新资源。
```bash
curl -X POST "http://localhost:8002/api/v1/job/add" \
-H "Content-Type: application/json" \
-d '{
"device_id": "host_node",
"action": "create_resource",
"action_args": {
"res_id": "my_plate",
"device_id": "host_node",
"class_name": "Plate",
"parent": "deck",
"bind_locations": {"x": 0, "y": 0, "z": 0}
}
}'
```
## 错误处理
### 设备繁忙
当设备正在执行其他任务时,提交新任务会返回 `status: 6`ABORTED
```json
{
"code": 0,
"data": {
"jobId": "xxx",
"status": 6,
"result": {}
},
"message": "success"
}
```
此时应等待当前任务完成后重试,或使用 `/devices/{device_id}/actions` 检查动作的 `is_busy` 状态。
### 参数错误
```json
{
"code": 2002,
"data": { ... },
"message": "device_id is required"
}
```
## 轮询策略
推荐的任务状态轮询策略:
```python
import requests
import time
def wait_for_job(job_id, timeout=60, interval=0.5):
"""等待任务完成并返回结果"""
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(f"http://localhost:8002/api/v1/job/{job_id}/status")
data = response.json()["data"]
status = data["status"]
if status in (4, 5, 6): # SUCCEEDED, CANCELED, ABORTED
return data
time.sleep(interval)
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
# 使用示例
response = requests.post(
"http://localhost:8002/api/v1/job/add",
json={"device_id": "host_node", "action": "test_latency", "action_args": {}}
)
job_id = response.json()["data"]["jobId"]
result = wait_for_job(job_id)
print(result)
```
## 相关文档
- [设备注册指南](add_device.md)
- [动作定义指南](add_action.md)
- [网络架构概述](networking_overview.md)

View File

@@ -7,3 +7,17 @@ Uni-Lab-OS 是一个开源的实验室自动化操作系统,提供统一的设
intro.md
```
## 开发者指南
```{toctree}
:maxdepth: 2
developer_guide/http_api.md
developer_guide/networking_overview.md
developer_guide/add_device.md
developer_guide/add_action.md
developer_guide/add_registry.md
developer_guide/add_yaml.md
developer_guide/action_includes.md
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 326 KiB

After

Width:  |  Height:  |  Size: 262 KiB

View File

@@ -218,7 +218,7 @@ def main():
if hasattr(BasicConfig, "log_level"):
logger.info(f"Log level set to '{BasicConfig.log_level}' from config file.")
configure_logger(loglevel=BasicConfig.log_level)
configure_logger(loglevel=BasicConfig.log_level, working_dir=working_dir)
if args_dict["addr"] == "test":
print_status("使用测试环境地址", "info")
@@ -450,13 +450,13 @@ def main():
start_backend(**args_dict)
start_server(
open_browser=not args_dict["disable_browser"],
port=args_dict["port"],
port=BasicConfig.port,
)
else:
start_backend(**args_dict)
start_server(
open_browser=not args_dict["disable_browser"],
port=args_dict["port"],
port=BasicConfig.port,
)

View File

@@ -51,21 +51,25 @@ class Resp(BaseModel):
class JobAddReq(BaseModel):
device_id: str = Field(examples=["Gripper"], description="device id")
action: str = Field(examples=["_execute_driver_command_async"], description="action name", default="")
action_type: str = Field(examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action name", default="")
action_args: dict = Field(examples=[{'string': 'string'}], description="action name", default="")
task_id: str = Field(examples=["task_id"], description="task uuid")
job_id: str = Field(examples=["job_id"], description="goal uuid")
node_id: str = Field(examples=["node_id"], description="node uuid")
server_info: dict = Field(examples=[{"send_timestamp": 1717000000.0}], description="server info")
action_type: str = Field(
examples=["unilabos_msgs.action._str_single_input.StrSingleInput"], description="action type", default=""
)
action_args: dict = Field(examples=[{"string": "string"}], description="action arguments", default_factory=dict)
task_id: str = Field(examples=["task_id"], description="task uuid (auto-generated if empty)", default="")
job_id: str = Field(examples=["job_id"], description="goal uuid (auto-generated if empty)", default="")
node_id: str = Field(examples=["node_id"], description="node uuid", default="")
server_info: dict = Field(
examples=[{"send_timestamp": 1717000000.0}],
description="server info (auto-generated if empty)",
default_factory=dict,
)
data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default={})
data: dict = Field(examples=[{"position": 30, "torque": 5, "action": "push_to"}], default_factory=dict)
class JobStepFinishReq(BaseModel):
token: str = Field(examples=["030944"], description="token")
request_time: str = Field(
examples=["2024-12-12 12:12:12.xxx"], description="requestTime"
)
request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime")
data: dict = Field(
examples=[
{
@@ -83,9 +87,7 @@ class JobStepFinishReq(BaseModel):
class JobPreintakeFinishReq(BaseModel):
token: str = Field(examples=["030944"], description="token")
request_time: str = Field(
examples=["2024-12-12 12:12:12.xxx"], description="requestTime"
)
request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime")
data: dict = Field(
examples=[
{
@@ -102,9 +104,7 @@ class JobPreintakeFinishReq(BaseModel):
class JobFinishReq(BaseModel):
token: str = Field(examples=["030944"], description="token")
request_time: str = Field(
examples=["2024-12-12 12:12:12.xxx"], description="requestTime"
)
request_time: str = Field(examples=["2024-12-12 12:12:12.xxx"], description="requestTime")
data: dict = Field(
examples=[
{
@@ -133,6 +133,10 @@ class JobData(BaseModel):
default=0,
description="0:UNKNOWN, 1:ACCEPTED, 2:EXECUTING, 3:CANCELING, 4:SUCCEEDED, 5:CANCELED, 6:ABORTED",
)
result: dict = Field(
default_factory=dict,
description="Job result data (available when status is SUCCEEDED/CANCELED/ABORTED)",
)
class JobStatusResp(Resp):

View File

@@ -9,13 +9,22 @@ import asyncio
import yaml
from unilabos.app.web.controler import devices, job_add, job_info
from unilabos.app.web.controller import (
devices,
job_add,
job_info,
get_online_devices,
get_device_actions,
get_action_schema,
get_all_available_actions,
)
from unilabos.app.model import (
Resp,
RespCode,
JobStatusResp,
JobAddResp,
JobAddReq,
JobData,
)
from unilabos.app.web.utils.host_utils import get_host_node_info
from unilabos.registry.registry import lab_registry
@@ -1234,6 +1243,65 @@ def get_devices():
return Resp(data=dict(data))
@api.get("/online-devices", summary="Online devices list", response_model=Resp)
def api_get_online_devices():
"""获取在线设备列表
返回当前在线的设备列表包含设备ID、命名空间、机器名等信息
"""
isok, data = get_online_devices()
if not isok:
return Resp(code=RespCode.ErrorHostNotInit, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/devices/{device_id}/actions", summary="Device actions list", response_model=Resp)
def api_get_device_actions(device_id: str):
"""获取设备可用的动作列表
Args:
device_id: 设备ID
返回指定设备的所有可用动作,包含动作名称、类型、是否繁忙等信息
"""
isok, data = get_device_actions(device_id)
if not isok:
return Resp(code=RespCode.ErrorInvalidReq, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/devices/{device_id}/actions/{action_name}/schema", summary="Action schema", response_model=Resp)
def api_get_action_schema(device_id: str, action_name: str):
"""获取动作的Schema详情
Args:
device_id: 设备ID
action_name: 动作名称
返回动作的参数Schema、默认值、类型等详细信息
"""
isok, data = get_action_schema(device_id, action_name)
if not isok:
return Resp(code=RespCode.ErrorInvalidReq, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/actions", summary="All available actions", response_model=Resp)
def api_get_all_actions():
"""获取所有设备的可用动作
返回所有已注册设备的动作列表,包含设备信息和各动作的状态
"""
isok, data = get_all_available_actions()
if not isok:
return Resp(code=RespCode.ErrorHostNotInit, message=data.get("error", "Unknown error"))
return Resp(data=data)
@api.get("/job/{id}/status", summary="Job status", response_model=JobStatusResp)
def job_status(id: str):
"""获取任务状态"""
@@ -1244,11 +1312,22 @@ def job_status(id: str):
@api.post("/job/add", summary="Create job", response_model=JobAddResp)
def post_job_add(req: JobAddReq):
"""创建任务"""
device_id = req.device_id
if not req.data:
return Resp(code=RespCode.ErrorInvalidReq, message="Invalid request data")
# 检查必要参数device_id 和 action
if not req.device_id:
return JobAddResp(
data=JobData(jobId="", status=6),
code=RespCode.ErrorInvalidReq,
message="device_id is required",
)
action_name = req.data.get("action", req.action) if req.data else req.action
if not action_name:
return JobAddResp(
data=JobData(jobId="", status=6),
code=RespCode.ErrorInvalidReq,
message="action is required",
)
req.device_id = device_id
data = job_add(req)
return JobAddResp(data=data)

View File

@@ -1,45 +0,0 @@
import json
import traceback
import uuid
from unilabos.app.model import JobAddReq, JobData
from unilabos.ros.nodes.presets.host_node import HostNode
from unilabos.utils.type_check import serialize_result_info
def get_resources() -> tuple:
if HostNode.get_instance() is None:
return False, "Host node not initialized"
return True, HostNode.get_instance().resources_config
def devices() -> tuple:
if HostNode.get_instance() is None:
return False, "Host node not initialized"
return True, HostNode.get_instance().devices_config
def job_info(id: str):
get_goal_status = HostNode.get_instance().get_goal_status(id)
return JobData(jobId=id, status=get_goal_status)
def job_add(req: JobAddReq) -> JobData:
if req.job_id is None:
req.job_id = str(uuid.uuid4())
action_name = req.data["action"]
action_type = req.data.get("action_type", "LocalUnknown")
action_args = req.data.get("action_kwargs", None) # 兼容老版本,后续删除
if action_args is None:
action_args = req.data.get("action_args")
else:
if "command" in action_args:
action_args = action_args["command"]
# print(f"job_add:{req.device_id} {action_name} {action_kwargs}")
try:
HostNode.get_instance().send_goal(req.device_id, action_type=action_type, action_name=action_name, action_kwargs=action_args, goal_uuid=req.job_id, server_info=req.server_info)
except Exception as e:
for bridge in HostNode.get_instance().bridges:
traceback.print_exc()
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status({}, req.job_id, "failed", serialize_result_info(traceback.format_exc(), False, {}))
return JobData(jobId=req.job_id)

View File

@@ -0,0 +1,587 @@
"""
Web API Controller
提供Web API的控制器函数处理设备、任务和动作相关的业务逻辑
"""
import threading
import time
import traceback
import uuid
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, Tuple
from unilabos.app.model import JobAddReq, JobData
from unilabos.ros.nodes.presets.host_node import HostNode
from unilabos.utils import logger
@dataclass
class JobResult:
"""任务结果数据"""
job_id: str
status: int # 4:SUCCEEDED, 5:CANCELED, 6:ABORTED
result: Dict[str, Any] = field(default_factory=dict)
feedback: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=time.time)
class JobResultStore:
"""任务结果存储(单例)"""
_instance: Optional["JobResultStore"] = None
_lock = threading.Lock()
def __init__(self):
if not hasattr(self, "_initialized"):
self._results: Dict[str, JobResult] = {}
self._results_lock = threading.RLock()
self._initialized = True
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def store_result(
self, job_id: str, status: int, result: Optional[Dict[str, Any]], feedback: Optional[Dict[str, Any]] = None
):
"""存储任务结果"""
with self._results_lock:
self._results[job_id] = JobResult(
job_id=job_id,
status=status,
result=result or {},
feedback=feedback or {},
timestamp=time.time(),
)
logger.debug(f"[JobResultStore] Stored result for job {job_id[:8]}, status={status}")
def get_and_remove(self, job_id: str) -> Optional[JobResult]:
"""获取并删除任务结果"""
with self._results_lock:
result = self._results.pop(job_id, None)
if result:
logger.debug(f"[JobResultStore] Retrieved and removed result for job {job_id[:8]}")
return result
def get_result(self, job_id: str) -> Optional[JobResult]:
"""仅获取任务结果(不删除)"""
with self._results_lock:
return self._results.get(job_id)
def cleanup_old_results(self, max_age_seconds: float = 3600):
"""清理过期的结果"""
current_time = time.time()
with self._results_lock:
expired_jobs = [
job_id for job_id, result in self._results.items() if current_time - result.timestamp > max_age_seconds
]
for job_id in expired_jobs:
del self._results[job_id]
logger.debug(f"[JobResultStore] Cleaned up expired result for job {job_id[:8]}")
# 全局结果存储实例
job_result_store = JobResultStore()
def store_job_result(
job_id: str, status: str, result: Optional[Dict[str, Any]], feedback: Optional[Dict[str, Any]] = None
):
"""存储任务结果(供外部调用)
Args:
job_id: 任务ID
status: 状态字符串 ("success", "failed", "cancelled")
result: 结果数据
feedback: 反馈数据
"""
# 转换状态字符串为整数
status_map = {
"success": 4, # SUCCEEDED
"failed": 6, # ABORTED
"cancelled": 5, # CANCELED
"running": 2, # EXECUTING
}
status_int = status_map.get(status, 0)
# 只存储最终状态
if status_int in (4, 5, 6):
job_result_store.store_result(job_id, status_int, result, feedback)
def get_resources() -> Tuple[bool, Any]:
"""获取资源配置
Returns:
Tuple[bool, Any]: (是否成功, 资源配置或错误信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, "Host node not initialized"
return True, host_node.resources_config
def devices() -> Tuple[bool, Any]:
"""获取设备配置
Returns:
Tuple[bool, Any]: (是否成功, 设备配置或错误信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, "Host node not initialized"
return True, host_node.devices_config
def job_info(job_id: str, remove_after_read: bool = True) -> JobData:
"""获取任务信息
Args:
job_id: 任务ID
remove_after_read: 是否在读取后删除结果默认True
Returns:
JobData: 任务数据
"""
# 首先检查结果存储中是否有已完成的结果
if remove_after_read:
stored_result = job_result_store.get_and_remove(job_id)
else:
stored_result = job_result_store.get_result(job_id)
if stored_result:
# 有存储的结果,直接返回
return JobData(
jobId=job_id,
status=stored_result.status,
result=stored_result.result,
)
# 没有存储的结果,从 HostNode 获取当前状态
host_node = HostNode.get_instance(0)
if host_node is None:
return JobData(jobId=job_id, status=0)
get_goal_status = host_node.get_goal_status(job_id)
return JobData(jobId=job_id, status=get_goal_status)
def check_device_action_busy(device_id: str, action_name: str) -> Tuple[bool, Optional[str]]:
"""检查设备动作是否正在执行(被占用)
Args:
device_id: 设备ID
action_name: 动作名称
Returns:
Tuple[bool, Optional[str]]: (是否繁忙, 当前执行的job_id或None)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, None
device_action_key = f"/devices/{device_id}/{action_name}"
# 检查 _device_action_status 中是否有正在执行的任务
if device_action_key in host_node._device_action_status:
status = host_node._device_action_status[device_action_key]
if status.job_ids:
# 返回第一个正在执行的job_id
current_job_id = next(iter(status.job_ids.keys()), None)
return True, current_job_id
return False, None
def _get_action_type(device_id: str, action_name: str) -> Optional[str]:
"""从注册表自动获取动作类型
Args:
device_id: 设备ID
action_name: 动作名称
Returns:
动作类型字符串未找到返回None
"""
try:
from unilabos.ros.nodes.base_device_node import registered_devices
# 方法1: 从运行时注册设备获取
if device_id in registered_devices:
device_info = registered_devices[device_id]
base_node = device_info.get("base_node_instance")
if base_node and hasattr(base_node, "_action_value_mappings"):
action_mappings = base_node._action_value_mappings
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
action_type = action_mappings[key].get("type")
if action_type:
# 转换为字符串格式
if hasattr(action_type, "__module__") and hasattr(action_type, "__name__"):
return f"{action_type.__module__}.{action_type.__name__}"
return str(action_type)
# 方法2: 从lab_registry获取
from unilabos.registry.registry import lab_registry
host_node = HostNode.get_instance(0)
if host_node and lab_registry:
devices_config = host_node.devices_config
device_class = None
for tree in devices_config.trees:
node = tree.root_node
if node.res_content.id == device_id:
device_class = node.res_content.klass
break
if device_class and device_class in lab_registry.device_type_registry:
device_type_info = lab_registry.device_type_registry[device_class]
class_info = device_type_info.get("class", {})
action_mappings = class_info.get("action_value_mappings", {})
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
action_type = action_mappings[key].get("type")
if action_type:
if hasattr(action_type, "__module__") and hasattr(action_type, "__name__"):
return f"{action_type.__module__}.{action_type.__name__}"
return str(action_type)
except Exception as e:
logger.warning(f"[Controller] Failed to get action type for {device_id}/{action_name}: {str(e)}")
return None
def job_add(req: JobAddReq) -> JobData:
"""添加任务(检查设备是否繁忙,繁忙则返回失败)
Args:
req: 任务添加请求
Returns:
JobData: 任务数据(包含状态)
"""
# 服务端自动生成 job_id 和 task_id
job_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
# 服务端自动生成 server_info
server_info = {"send_timestamp": time.time()}
host_node = HostNode.get_instance(0)
if host_node is None:
logger.error(f"[Controller] Host node not initialized for job: {job_id[:8]}")
return JobData(jobId=job_id, status=6) # 6 = ABORTED
# 解析动作信息
action_name = req.data.get("action", req.action) if req.data else req.action
action_args = req.data.get("action_kwargs") or req.data.get("action_args") if req.data else req.action_args
if action_args is None:
action_args = req.action_args or {}
elif isinstance(action_args, dict) and "command" in action_args:
action_args = action_args["command"]
# 自动获取 action_type
action_type = _get_action_type(req.device_id, action_name)
if action_type is None:
logger.error(f"[Controller] Action type not found for {req.device_id}/{action_name}")
return JobData(jobId=job_id, status=6) # ABORTED
# 检查设备动作是否繁忙
is_busy, current_job_id = check_device_action_busy(req.device_id, action_name)
if is_busy:
logger.warning(
f"[Controller] Device action busy: {req.device_id}/{action_name}, "
f"current job: {current_job_id[:8] if current_job_id else 'unknown'}"
)
# 返回失败状态status=6 表示 ABORTED
return JobData(jobId=job_id, status=6)
# 设备空闲,提交任务执行
try:
from unilabos.app.ws_client import QueueItem
device_action_key = f"/devices/{req.device_id}/{action_name}"
queue_item = QueueItem(
task_type="job_call_back_status",
device_id=req.device_id,
action_name=action_name,
task_id=task_id,
job_id=job_id,
device_action_key=device_action_key,
)
host_node.send_goal(
queue_item,
action_type=action_type,
action_kwargs=action_args,
server_info=server_info,
)
logger.info(f"[Controller] Job submitted: {job_id[:8]} -> {req.device_id}/{action_name}")
# 返回已接受状态status=1 表示 ACCEPTED
return JobData(jobId=job_id, status=1)
except ValueError as e:
# ActionClient not found 等错误
logger.error(f"[Controller] Action not available: {str(e)}")
return JobData(jobId=job_id, status=6) # ABORTED
except Exception as e:
logger.error(f"[Controller] Error submitting job: {str(e)}")
traceback.print_exc()
return JobData(jobId=job_id, status=6) # ABORTED
def get_online_devices() -> Tuple[bool, Dict[str, Any]]:
"""获取在线设备列表
Returns:
Tuple[bool, Dict]: (是否成功, 在线设备信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.ros.nodes.base_device_node import registered_devices
online_devices = {}
for device_key in host_node._online_devices:
# device_key 格式: "namespace/device_id"
parts = device_key.split("/")
if len(parts) >= 2:
device_id = parts[-1]
else:
device_id = device_key
# 获取设备详细信息
device_info = registered_devices.get(device_id, {})
machine_name = host_node.device_machine_names.get(device_id, "未知")
online_devices[device_id] = {
"device_key": device_key,
"namespace": host_node.devices_names.get(device_id, ""),
"machine_name": machine_name,
"uuid": device_info.get("uuid", "") if device_info else "",
"node_name": device_info.get("node_name", "") if device_info else "",
}
return True, {
"online_devices": online_devices,
"total_count": len(online_devices),
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"[Controller] Error getting online devices: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}
def get_device_actions(device_id: str) -> Tuple[bool, Dict[str, Any]]:
"""获取设备可用的动作列表
Args:
device_id: 设备ID
Returns:
Tuple[bool, Dict]: (是否成功, 动作列表信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.ros.nodes.base_device_node import registered_devices
from unilabos.app.web.utils.action_utils import get_action_info
# 检查设备是否已注册
if device_id not in registered_devices:
return False, {"error": f"Device not found: {device_id}"}
device_info = registered_devices[device_id]
actions = device_info.get("actions", {})
actions_list = {}
for action_name, action_server in actions.items():
try:
action_info = get_action_info(action_server, action_name)
# 检查动作是否繁忙
is_busy, current_job = check_device_action_busy(device_id, action_name)
actions_list[action_name] = {
**action_info,
"is_busy": is_busy,
"current_job_id": current_job[:8] if current_job else None,
}
except Exception as e:
logger.warning(f"[Controller] Error getting action info for {action_name}: {str(e)}")
actions_list[action_name] = {
"type_name": "unknown",
"action_path": f"/devices/{device_id}/{action_name}",
"is_busy": False,
"error": str(e),
}
return True, {
"device_id": device_id,
"actions": actions_list,
"action_count": len(actions_list),
}
except Exception as e:
logger.error(f"[Controller] Error getting device actions: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}
def get_action_schema(device_id: str, action_name: str) -> Tuple[bool, Dict[str, Any]]:
"""获取动作的Schema详情
Args:
device_id: 设备ID
action_name: 动作名称
Returns:
Tuple[bool, Dict]: (是否成功, Schema信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.registry.registry import lab_registry
from unilabos.ros.nodes.base_device_node import registered_devices
result = {
"device_id": device_id,
"action_name": action_name,
"schema": None,
"goal_default": None,
"action_type": None,
"is_busy": False,
}
# 检查动作是否繁忙
is_busy, current_job = check_device_action_busy(device_id, action_name)
result["is_busy"] = is_busy
result["current_job_id"] = current_job[:8] if current_job else None
# 方法1: 从 registered_devices 获取运行时信息
if device_id in registered_devices:
device_info = registered_devices[device_id]
base_node = device_info.get("base_node_instance")
if base_node and hasattr(base_node, "_action_value_mappings"):
action_mappings = base_node._action_value_mappings
if action_name in action_mappings:
mapping = action_mappings[action_name]
result["schema"] = mapping.get("schema")
result["goal_default"] = mapping.get("goal_default")
result["action_type"] = str(mapping.get("type", ""))
# 方法2: 从 lab_registry 获取注册表信息(如果运行时没有)
if result["schema"] is None and lab_registry:
# 尝试查找设备类型
devices_config = host_node.devices_config
device_class = None
# 从配置中获取设备类型
for tree in devices_config.trees:
node = tree.root_node
if node.res_content.id == device_id:
device_class = node.res_content.klass
break
if device_class and device_class in lab_registry.device_type_registry:
device_type_info = lab_registry.device_type_registry[device_class]
class_info = device_type_info.get("class", {})
action_mappings = class_info.get("action_value_mappings", {})
# 尝试直接匹配或 auto- 前缀匹配
for key in [action_name, f"auto-{action_name}"]:
if key in action_mappings:
mapping = action_mappings[key]
result["schema"] = mapping.get("schema")
result["goal_default"] = mapping.get("goal_default")
result["action_type"] = str(mapping.get("type", ""))
result["handles"] = mapping.get("handles", {})
result["placeholder_keys"] = mapping.get("placeholder_keys", {})
break
if result["schema"] is None:
return False, {"error": f"Action schema not found: {device_id}/{action_name}"}
return True, result
except Exception as e:
logger.error(f"[Controller] Error getting action schema: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}
def get_all_available_actions() -> Tuple[bool, Dict[str, Any]]:
"""获取所有设备的可用动作
Returns:
Tuple[bool, Dict]: (是否成功, 所有设备的动作信息)
"""
host_node = HostNode.get_instance(0)
if host_node is None:
return False, {"error": "Host node not initialized"}
try:
from unilabos.ros.nodes.base_device_node import registered_devices
from unilabos.app.web.utils.action_utils import get_action_info
all_actions = {}
total_action_count = 0
for device_id, device_info in registered_devices.items():
actions = device_info.get("actions", {})
device_actions = {}
for action_name, action_server in actions.items():
try:
action_info = get_action_info(action_server, action_name)
is_busy, current_job = check_device_action_busy(device_id, action_name)
device_actions[action_name] = {
"type_name": action_info.get("type_name", ""),
"action_path": action_info.get("action_path", ""),
"is_busy": is_busy,
"current_job_id": current_job[:8] if current_job else None,
}
total_action_count += 1
except Exception as e:
logger.warning(f"[Controller] Error processing action {device_id}/{action_name}: {str(e)}")
if device_actions:
all_actions[device_id] = {
"actions": device_actions,
"action_count": len(device_actions),
"machine_name": host_node.device_machine_names.get(device_id, "未知"),
}
return True, {
"devices": all_actions,
"device_count": len(all_actions),
"total_action_count": total_action_count,
"timestamp": time.time(),
}
except Exception as e:
logger.error(f"[Controller] Error getting all available actions: {str(e)}")
traceback.print_exc()
return False, {"error": str(e)}

View File

@@ -147,6 +147,9 @@ class LiquidHandlerMiddleware(LiquidHandler):
offsets: Optional[List[Coordinate]] = None,
**backend_kwargs,
):
# 如果 use_channels 为 None使用默认值所有通道
if use_channels is None:
use_channels = list(range(self.channel_num))
if not offsets or (isinstance(offsets, list) and len(offsets) != len(use_channels)):
offsets = [Coordinate.zero()] * len(use_channels)
if self._simulator:
@@ -759,7 +762,7 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume=current_dis_blow_out_air_volume,
spread=spread,
)
if delays is not None:
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips()
@@ -833,17 +836,19 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
spread=spread,
)
if delays is not None:
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.mix(
targets=[targets[_]],
mix_time=mix_time,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if delays is not None:
# 只有在 mix_time 有效时才调用 mix
if mix_time is not None and mix_time > 0:
await self.mix(
targets=[targets[_]],
mix_time=mix_time,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_])
await self.discard_tips()
@@ -893,18 +898,20 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
blow_out_air_volume=current_dis_blow_out_air_volume,
spread=spread,
)
if delays is not None:
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.mix(
targets=current_targets,
mix_time=mix_time,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if delays is not None:
# 只有在 mix_time 有效时才调用 mix
if mix_time is not None and mix_time > 0:
await self.mix(
targets=current_targets,
mix_time=mix_time,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips()
@@ -942,60 +949,146 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
delays: Optional[List[int]] = None,
none_keys: List[str] = [],
):
"""Transfer liquid from each *source* well/plate to the corresponding *target*.
"""Transfer liquid with automatic mode detection.
Supports three transfer modes:
1. One-to-many (1 source -> N targets): Distribute from one source to multiple targets
2. One-to-one (N sources -> N targets): Standard transfer, each source to corresponding target
3. Many-to-one (N sources -> 1 target): Combine multiple sources into one target
Parameters
----------
asp_vols, dis_vols
Single volume (µL) or list matching the number of transfers.
Single volume (µL) or list. Automatically expanded based on transfer mode.
sources, targets
Samelength sequences of containers (wells or plates). In 96well mode
each must contain exactly one plate.
Containers (wells or plates). Length determines transfer mode:
- len(sources) == 1, len(targets) > 1: One-to-many mode
- len(sources) == len(targets): One-to-one mode
- len(sources) > 1, len(targets) == 1: Many-to-one mode
tip_racks
One or more TipRacks providing fresh tips.
is_96_well
Set *True* to use the 96channel head.
"""
# 确保 use_channels 有默认值
if use_channels is None:
use_channels = [0] if self.channel_num >= 1 else list(range(self.channel_num))
if is_96_well:
pass # This mode is not verified.
else:
if len(asp_vols) != len(targets):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
# 转换体积参数为列表
if isinstance(asp_vols, (int, float)):
asp_vols = [float(asp_vols)]
else:
asp_vols = [float(v) for v in asp_vols]
if isinstance(dis_vols, (int, float)):
dis_vols = [float(dis_vols)]
else:
dis_vols = [float(v) for v in dis_vols]
# 识别传输模式
num_sources = len(sources)
num_targets = len(targets)
if num_sources == 1 and num_targets > 1:
# 模式1: 一对多 (1 source -> N targets)
await self._transfer_one_to_many(
sources[0], targets, tip_racks, use_channels,
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
offsets, touch_tip, liquid_height, blow_out_air_volume,
spread, mix_stage, mix_times, mix_vol, mix_rate,
mix_liquid_height, delays
)
elif num_sources > 1 and num_targets == 1:
# 模式2: 多对一 (N sources -> 1 target)
await self._transfer_many_to_one(
sources, targets[0], tip_racks, use_channels,
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
offsets, touch_tip, liquid_height, blow_out_air_volume,
spread, mix_stage, mix_times, mix_vol, mix_rate,
mix_liquid_height, delays
)
elif num_sources == num_targets:
# 模式3: 一对一 (N sources -> N targets) - 原有逻辑
await self._transfer_one_to_one(
sources, targets, tip_racks, use_channels,
asp_vols, dis_vols, asp_flow_rates, dis_flow_rates,
offsets, touch_tip, liquid_height, blow_out_air_volume,
spread, mix_stage, mix_times, mix_vol, mix_rate,
mix_liquid_height, delays
)
else:
raise ValueError(
f"Unsupported transfer mode: {num_sources} sources -> {num_targets} targets. "
"Supported modes: 1->N, N->1, or N->N."
)
# 首先应该对任务分组然后每次1个/8个进行操作处理
if len(use_channels) == 1:
for _ in range(len(targets)):
tip = []
for ___ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
async def _transfer_one_to_one(
self,
sources: Sequence[Container],
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
asp_flow_rates: Optional[List[Optional[float]]],
dis_flow_rates: Optional[List[Optional[float]]],
offsets: Optional[List[Coordinate]],
touch_tip: bool,
liquid_height: Optional[List[Optional[float]]],
blow_out_air_volume: Optional[List[Optional[float]]],
spread: Literal["wide", "tight", "custom"],
mix_stage: Optional[Literal["none", "before", "after", "both"]],
mix_times: Optional[int],
mix_vol: Optional[int],
mix_rate: Optional[int],
mix_liquid_height: Optional[float],
delays: Optional[List[int]],
):
"""一对一传输模式N sources -> N targets"""
# 验证参数长度
if len(asp_vols) != len(targets):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `targets` {len(targets)}.")
if len(dis_vols) != len(targets):
raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.")
if len(sources) != len(targets):
raise ValueError(f"Length of `sources` {len(sources)} must match `targets` {len(targets)}.")
await self.aspirate(
resources=[sources[_]],
vols=[asp_vols[_]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[0]] if asp_flow_rates else None,
offsets=[offsets[0]] if offsets else None,
liquid_height=[liquid_height[0]] if liquid_height else None,
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[targets[_]],
vols=[dis_vols[_]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[1]] if dis_flow_rates else None,
offsets=[offsets[1]] if offsets else None,
blow_out_air_volume=[blow_out_air_volume[1]] if blow_out_air_volume else None,
liquid_height=[liquid_height[1]] if liquid_height else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
if len(use_channels) == 1:
for _ in range(len(targets)):
tip = []
for ___ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.aspirate(
resources=[sources[_]],
vols=[asp_vols[_]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[_]] if asp_flow_rates and len(asp_flow_rates) > _ else None,
offsets=[offsets[_]] if offsets and len(offsets) > _ else None,
liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None,
blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=[targets[_]],
vols=[dis_vols[_]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[_]] if dis_flow_rates and len(dis_flow_rates) > _ else None,
offsets=[offsets[_]] if offsets and len(offsets) > _ else None,
blow_out_air_volume=[blow_out_air_volume[_]] if blow_out_air_volume and len(blow_out_air_volume) > _ else None,
liquid_height=[liquid_height[_]] if liquid_height and len(liquid_height) > _ else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[targets[_]],
mix_time=mix_times,
@@ -1004,63 +1097,60 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_])
await self.discard_tips()
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(targets[_])
await self.discard_tips(use_channels=use_channels)
elif len(use_channels) == 8:
# 对于8个的情况需要判断此时任务是不是能被8通道移液站来成功处理
if len(targets) % 8 != 0:
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
elif len(use_channels) == 8:
if len(targets) % 8 != 0:
raise ValueError(f"Length of `targets` {len(targets)} must be a multiple of 8 for 8-channel mode.")
# 8个8个来取任务序列
for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_targets = targets[i:i + 8]
current_reagent_sources = sources[i:i + 8]
current_asp_vols = asp_vols[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
for i in range(0, len(targets), 8):
# 取出8个任务
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_targets = targets[i:i + 8]
current_reagent_sources = sources[i:i + 8]
current_asp_vols = asp_vols[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8]
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_offset = offsets[-i*8-8:len(offsets)-i*8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_liquid_height = liquid_height[-i*8-8:len(liquid_height)-i*8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[-i*8-8:len(blow_out_air_volume)-i*8] if blow_out_air_volume else [None] * 8
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else [None] * 8
await self.aspirate(
resources=current_reagent_sources,
vols=current_asp_vols,
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
blow_out_air_volume=current_asp_blow_out_air_volume,
liquid_height=current_asp_liquid_height,
spread=spread,
)
await self.aspirate(
resources=current_reagent_sources,
vols=current_asp_vols,
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
blow_out_air_volume=current_asp_blow_out_air_volume,
liquid_height=current_asp_liquid_height,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=current_targets,
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
if delays is not None:
await self.custom_delay(seconds=delays[0])
await self.dispense(
resources=current_targets,
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=current_targets,
mix_time=mix_times,
@@ -1069,10 +1159,363 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if delays is not None:
await self.custom_delay(seconds=delays[1])
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.touch_tip(current_targets)
await self.discard_tips([0,1,2,3,4,5,6,7])
async def _transfer_one_to_many(
self,
source: Container,
targets: Sequence[Container],
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
asp_flow_rates: Optional[List[Optional[float]]],
dis_flow_rates: Optional[List[Optional[float]]],
offsets: Optional[List[Coordinate]],
touch_tip: bool,
liquid_height: Optional[List[Optional[float]]],
blow_out_air_volume: Optional[List[Optional[float]]],
spread: Literal["wide", "tight", "custom"],
mix_stage: Optional[Literal["none", "before", "after", "both"]],
mix_times: Optional[int],
mix_vol: Optional[int],
mix_rate: Optional[int],
mix_liquid_height: Optional[float],
delays: Optional[List[int]],
):
"""一对多传输模式1 source -> N targets"""
# 验证和扩展体积参数
if len(asp_vols) == 1:
# 如果只提供一个吸液体积,计算总吸液体积(所有分液体积之和)
total_asp_vol = sum(dis_vols)
asp_vol = asp_vols[0] if asp_vols[0] >= total_asp_vol else total_asp_vol
else:
raise ValueError("For one-to-many mode, `asp_vols` should be a single value or list with one element.")
if len(dis_vols) != len(targets):
raise ValueError(f"Length of `dis_vols` {len(dis_vols)} must match `targets` {len(targets)}.")
if len(use_channels) == 1:
# 单通道模式:一次吸液,多次分液
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
# 从源容器吸液(总体积)
await self.aspirate(
resources=[source],
vols=[asp_vol],
use_channels=use_channels,
flow_rates=[asp_flow_rates[0]] if asp_flow_rates and len(asp_flow_rates) > 0 else None,
offsets=[offsets[0]] if offsets and len(offsets) > 0 else None,
liquid_height=[liquid_height[0]] if liquid_height and len(liquid_height) > 0 else None,
blow_out_air_volume=[blow_out_air_volume[0]] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分多次分液到不同的目标容器
for idx, target in enumerate(targets):
await self.dispense(
resources=[target],
vols=[dis_vols[idx]],
use_channels=use_channels,
flow_rates=[dis_flow_rates[idx]] if dis_flow_rates and len(dis_flow_rates) > idx else None,
offsets=[offsets[idx]] if offsets and len(offsets) > idx else None,
blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None,
liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[target],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets[idx:idx+1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip([target])
await self.discard_tips(use_channels=use_channels)
elif len(use_channels) == 8:
# 8通道模式需要确保目标数量是8的倍数
if len(targets) % 8 != 0:
raise ValueError(f"For 8-channel mode, number of targets {len(targets)} must be a multiple of 8.")
# 每次处理8个目标
for i in range(0, len(targets), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_targets = targets[i:i + 8]
current_dis_vols = dis_vols[i:i + 8]
# 8个通道都从同一个源容器吸液每个通道的吸液体积等于对应的分液体积
current_asp_flow_rates = asp_flow_rates[0:1] * 8 if asp_flow_rates and len(asp_flow_rates) > 0 else None
current_asp_offset = offsets[0:1] * 8 if offsets and len(offsets) > 0 else [None] * 8
current_asp_liquid_height = liquid_height[0:1] * 8 if liquid_height and len(liquid_height) > 0 else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume and len(blow_out_air_volume) > 0 else [None] * 8
# 从源容器吸液8个通道都从同一个源但每个通道的吸液体积不同
await self.aspirate(
resources=[source] * 8, # 8个通道都从同一个源
vols=current_dis_vols, # 每个通道的吸液体积等于对应的分液体积
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
liquid_height=current_asp_liquid_height,
blow_out_air_volume=current_asp_blow_out_air_volume,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到8个目标
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
await self.dispense(
resources=current_targets,
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=current_targets,
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip(current_targets)
await self.discard_tips([0,1,2,3,4,5,6,7])
await self.discard_tips([0,1,2,3,4,5,6,7])
async def _transfer_many_to_one(
self,
sources: Sequence[Container],
target: Container,
tip_racks: Sequence[TipRack],
use_channels: List[int],
asp_vols: List[float],
dis_vols: List[float],
asp_flow_rates: Optional[List[Optional[float]]],
dis_flow_rates: Optional[List[Optional[float]]],
offsets: Optional[List[Coordinate]],
touch_tip: bool,
liquid_height: Optional[List[Optional[float]]],
blow_out_air_volume: Optional[List[Optional[float]]],
spread: Literal["wide", "tight", "custom"],
mix_stage: Optional[Literal["none", "before", "after", "both"]],
mix_times: Optional[int],
mix_vol: Optional[int],
mix_rate: Optional[int],
mix_liquid_height: Optional[float],
delays: Optional[List[int]],
):
"""多对一传输模式N sources -> 1 target汇总/混合)"""
# 验证和扩展体积参数
if len(asp_vols) != len(sources):
raise ValueError(f"Length of `asp_vols` {len(asp_vols)} must match `sources` {len(sources)}.")
# 支持两种模式:
# 1. dis_vols 为单个值:所有源汇总,使用总吸液体积或指定分液体积
# 2. dis_vols 长度等于 asp_vols每个源按不同比例分液按比例混合
if len(dis_vols) == 1:
# 模式1使用单个分液体积
total_dis_vol = sum(asp_vols)
dis_vol = dis_vols[0] if dis_vols[0] >= total_dis_vol else total_dis_vol
use_proportional_mixing = False
elif len(dis_vols) == len(asp_vols):
# 模式2按不同比例混合
use_proportional_mixing = True
else:
raise ValueError(
f"For many-to-one mode, `dis_vols` should be a single value or list with length {len(asp_vols)} "
f"(matching `asp_vols`). Got length {len(dis_vols)}."
)
if len(use_channels) == 1:
# 单通道模式:多次吸液,一次分液
# 先混合前(如果需要)
if mix_stage in ["before", "both"] and mix_times is not None and mix_times > 0:
# 注意:在吸液前混合源容器通常不常见,这里跳过
pass
# 从每个源容器吸液并分液到目标容器
for idx, source in enumerate(sources):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
await self.aspirate(
resources=[source],
vols=[asp_vols[idx]],
use_channels=use_channels,
flow_rates=[asp_flow_rates[idx]] if asp_flow_rates and len(asp_flow_rates) > idx else None,
offsets=[offsets[idx]] if offsets and len(offsets) > idx else None,
liquid_height=[liquid_height[idx]] if liquid_height and len(liquid_height) > idx else None,
blow_out_air_volume=[blow_out_air_volume[idx]] if blow_out_air_volume and len(blow_out_air_volume) > idx else None,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到目标容器
if use_proportional_mixing:
# 按不同比例混合:使用对应的 dis_vols
dis_vol = dis_vols[idx]
dis_flow_rate = dis_flow_rates[idx] if dis_flow_rates and len(dis_flow_rates) > idx else None
dis_offset = offsets[idx] if offsets and len(offsets) > idx else None
dis_liquid_height = liquid_height[idx] if liquid_height and len(liquid_height) > idx else None
dis_blow_out = blow_out_air_volume[idx] if blow_out_air_volume and len(blow_out_air_volume) > idx else None
else:
# 标准模式:分液体积等于吸液体积
dis_vol = asp_vols[idx]
dis_flow_rate = dis_flow_rates[0] if dis_flow_rates and len(dis_flow_rates) > 0 else None
dis_offset = offsets[0] if offsets and len(offsets) > 0 else None
dis_liquid_height = liquid_height[0] if liquid_height and len(liquid_height) > 0 else None
dis_blow_out = blow_out_air_volume[0] if blow_out_air_volume and len(blow_out_air_volume) > 0 else None
await self.dispense(
resources=[target],
vols=[dis_vol],
use_channels=use_channels,
flow_rates=[dis_flow_rate] if dis_flow_rate is not None else None,
offsets=[dis_offset] if dis_offset is not None else None,
blow_out_air_volume=[dis_blow_out] if dis_blow_out is not None else None,
liquid_height=[dis_liquid_height] if dis_liquid_height is not None else None,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.discard_tips(use_channels=use_channels)
# 最后在目标容器中混合(如果需要)
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[target],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip([target])
elif len(use_channels) == 8:
# 8通道模式需要确保源数量是8的倍数
if len(sources) % 8 != 0:
raise ValueError(f"For 8-channel mode, number of sources {len(sources)} must be a multiple of 8.")
# 每次处理8个源
for i in range(0, len(sources), 8):
tip = []
for _ in range(len(use_channels)):
tip.extend(next(self.current_tip))
await self.pick_up_tips(tip)
current_sources = sources[i:i + 8]
current_asp_vols = asp_vols[i:i + 8]
current_asp_flow_rates = asp_flow_rates[i:i + 8] if asp_flow_rates else None
current_asp_offset = offsets[i:i + 8] if offsets else [None] * 8
current_asp_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_asp_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
# 从8个源容器吸液
await self.aspirate(
resources=current_sources,
vols=current_asp_vols,
use_channels=use_channels,
flow_rates=current_asp_flow_rates,
offsets=current_asp_offset,
blow_out_air_volume=current_asp_blow_out_air_volume,
liquid_height=current_asp_liquid_height,
spread=spread,
)
if delays is not None:
await self.custom_delay(seconds=delays[0])
# 分液到目标容器(每个通道分液到同一个目标)
if use_proportional_mixing:
# 按比例混合:使用对应的 dis_vols
current_dis_vols = dis_vols[i:i + 8]
current_dis_flow_rates = dis_flow_rates[i:i + 8] if dis_flow_rates else None
current_dis_offset = offsets[i:i + 8] if offsets else [None] * 8
current_dis_liquid_height = liquid_height[i:i + 8] if liquid_height else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[i:i + 8] if blow_out_air_volume else [None] * 8
else:
# 标准模式:每个通道分液体积等于其吸液体积
current_dis_vols = current_asp_vols
current_dis_flow_rates = dis_flow_rates[0:1] * 8 if dis_flow_rates else None
current_dis_offset = offsets[0:1] * 8 if offsets else [None] * 8
current_dis_liquid_height = liquid_height[0:1] * 8 if liquid_height else [None] * 8
current_dis_blow_out_air_volume = blow_out_air_volume[0:1] * 8 if blow_out_air_volume else [None] * 8
await self.dispense(
resources=[target] * 8, # 8个通道都分到同一个目标
vols=current_dis_vols,
use_channels=use_channels,
flow_rates=current_dis_flow_rates,
offsets=current_dis_offset,
blow_out_air_volume=current_dis_blow_out_air_volume,
liquid_height=current_dis_liquid_height,
spread=spread,
)
if delays is not None and len(delays) > 1:
await self.custom_delay(seconds=delays[1])
await self.discard_tips([0,1,2,3,4,5,6,7])
# 最后在目标容器中混合(如果需要)
if mix_stage in ["after", "both"] and mix_times is not None and mix_times > 0:
await self.mix(
targets=[target],
mix_time=mix_times,
mix_vol=mix_vol,
offsets=offsets[0:1] if offsets else None,
height_to_bottom=mix_liquid_height if mix_liquid_height else None,
mix_rate=mix_rate if mix_rate else None,
)
if touch_tip:
await self.touch_tip([target])
# except Exception as e:
# traceback.print_exc()

View File

@@ -7,7 +7,7 @@ class VirtualMultiwayValve:
"""
虚拟九通阀门 - 0号位连接transfer pump1-8号位连接其他设备 🔄
"""
def __init__(self, port: str = "VIRTUAL", positions: int = 8):
def __init__(self, port: str = "VIRTUAL", positions: int = 8, **kwargs):
self.port = port
self.max_positions = positions # 1-8号位
self.total_positions = positions + 1 # 0-8号位共9个位置

View File

@@ -2,6 +2,7 @@ from datetime import datetime
import json
import time
from typing import Optional, Dict, Any, List
from typing_extensions import TypedDict
import requests
import pint
from unilabos.devices.workstation.bioyond_studio.config import API_CONFIG
@@ -14,6 +15,14 @@ import sys
from pathlib import Path
import importlib
class ComputeExperimentDesignReturn(TypedDict):
solutions: list
titration: dict
solvents: dict
feeding_order: list
return_info: str
class BioyondDispensingStation(BioyondWorkstation):
def __init__(
self,
@@ -138,85 +147,20 @@ class BioyondDispensingStation(BioyondWorkstation):
wt_percent: str = "0.25",
m_tot: str = "70",
titration_percent: str = "0.03",
) -> dict:
"""计算实验设计参数
参数:
ratio: 化合物配比,支持多种格式:
1. 简化格式(推荐): "MDA:0.5,PAPP:0.5,BTDA:0.95"
2. JSON字符串: '{"MDA": 1, "BTDA": 0.95, "PAPP": 1}'
3. Python字典: {"MDA": 1, "BTDA": 0.95, "PAPP": 1}
wt_percent: 固体重量百分比,默认 0.25
m_tot: 反应混合物总质量(g),默认 70
titration_percent: 滴定溶液百分比,默认 0.03
返回:
包含实验设计参数的字典
"""
) -> ComputeExperimentDesignReturn:
try:
# 1. 参数解析和验证
original_ratio = ratio
if isinstance(ratio, str):
# 尝试解析简化格式: "MDA:0.5,PAPP:0.5,BTDA:0.95"
if ':' in ratio and ',' in ratio:
try:
ratio_dict = {}
pairs = ratio.split(',')
for pair in pairs:
pair = pair.strip()
if ':' in pair:
key, value = pair.split(':', 1)
key = key.strip()
value = value.strip()
try:
ratio_dict[key] = float(value)
except ValueError:
raise BioyondException(f"无法将 '{value}' 转换为数字")
if ratio_dict:
ratio = ratio_dict
self.hardware_interface._logger.info(
f"从简化格式解析 ratio: '{original_ratio}' -> {ratio}"
)
except BioyondException:
raise
except Exception as e:
self.hardware_interface._logger.warning(
f"简化格式解析失败尝试JSON格式: {e}"
)
# 如果不是简化格式或解析失败尝试JSON格式
if isinstance(ratio, str):
try:
ratio = json.loads(ratio)
# 处理可能的多层 JSON 编码
if isinstance(ratio, str):
try:
ratio = json.loads(ratio)
except Exception:
pass
except Exception as e:
raise BioyondException(
f"ratio 参数解析失败: {e}\n"
f"支持的格式:\n"
f" 1. 简化格式(推荐): 'MDA:0.5,PAPP:0.5,BTDA:0.95'\n"
f" 2. JSON格式: '{{\"MDA\": 0.5, \"BTDA\": 0.95, \"PAPP\": 0.5}}'"
)
if not isinstance(ratio, dict):
raise BioyondException(
f"ratio 必须是字典类型或可解析的字符串,当前类型: {type(ratio)}\n"
f"支持的格式:\n"
f" 1. 简化格式(推荐): 'MDA:0.5,PAPP:0.5,BTDA:0.95'\n"
f" 2. JSON格式: '{{\"MDA\": 0.5, \"BTDA\": 0.95, \"PAPP\": 0.5}}'"
)
if not ratio:
raise BioyondException("ratio 参数不能为空")
# 记录解析后的参数用于调试
self.hardware_interface._logger.info(f"最终解析的 ratio 参数: {ratio} (类型: {type(ratio)})")
try:
ratio = json.loads(ratio)
except Exception:
ratio = {}
root = str(Path(__file__).resolve().parents[3])
if root not in sys.path:
sys.path.append(root)
try:
mod = importlib.import_module("tem.compute")
except Exception as e:
raise BioyondException(f"无法导入计算模块: {e}")
try:
wp = float(wt_percent) if isinstance(wt_percent, str) else wt_percent
mt = float(m_tot) if isinstance(m_tot, str) else m_tot
@@ -1299,25 +1243,6 @@ class BioyondDispensingStation(BioyondWorkstation):
'actualVolume': actual_volume
}
def scheduler_start(self) -> dict:
"""启动调度器 - 启动Bioyond工作站的任务调度器开始执行队列中的任务
Returns:
dict: 包含return_info的字典return_info为整型(1=成功)
Raises:
BioyondException: 调度器启动失败时抛出异常
"""
result = self.hardware_interface.scheduler_start()
self.hardware_interface._logger.info(f"调度器启动结果: {result}")
if result != 1:
error_msg = "启动调度器失败: 有未处理错误调度无法启动。请检查Bioyond系统状态。"
self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg)
return {"return_info": result}
# 等待多个任务完成并获取实验报告
def wait_for_multiple_orders_and_get_reports(self,
batch_create_result: str = None,

View File

@@ -147,7 +147,7 @@ class WorkstationBase(ABC):
def __init__(
self,
deck: Deck,
deck: Optional[Deck],
*args,
**kwargs, # 必须有kwargs
):
@@ -349,5 +349,5 @@ class WorkstationBase(ABC):
class ProtocolNode(WorkstationBase):
def __init__(self, deck: Optional[PLRResource], *args, **kwargs):
def __init__(self, protocol_type: List[str], deck: Optional[PLRResource], *args, **kwargs):
super().__init__(deck, *args, **kwargs)

View File

@@ -83,6 +83,96 @@ workstation.bioyond_dispensing_station:
title: batch_create_diamine_solution_tasks参数
type: object
type: UniLabJsonCommand
auto-brief_step_parameters:
feedback: {}
goal: {}
goal_default:
data: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
data:
type: object
required:
- data
type: object
result: {}
required:
- goal
title: brief_step_parameters参数
type: object
type: UniLabJsonCommand
auto-compute_experiment_design:
feedback: {}
goal: {}
goal_default:
m_tot: '70'
ratio: null
titration_percent: '0.03'
wt_percent: '0.25'
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
m_tot:
default: '70'
type: string
ratio:
type: object
titration_percent:
default: '0.03'
type: string
wt_percent:
default: '0.25'
type: string
required:
- ratio
type: object
result:
properties:
feeding_order:
items: {}
title: Feeding Order
type: array
return_info:
title: Return Info
type: string
solutions:
items: {}
title: Solutions
type: array
solvents:
additionalProperties: true
title: Solvents
type: object
titration:
additionalProperties: true
title: Titration
type: object
required:
- solutions
- titration
- solvents
- feeding_order
- return_info
title: ComputeExperimentDesignReturn
type: object
required:
- goal
title: compute_experiment_design参数
type: object
type: UniLabJsonCommand
auto-process_order_finish_report:
feedback: {}
goal: {}
@@ -112,6 +202,85 @@ workstation.bioyond_dispensing_station:
title: process_order_finish_report参数
type: object
type: UniLabJsonCommand
auto-project_order_report:
feedback: {}
goal: {}
goal_default:
order_id: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
order_id:
type: string
required:
- order_id
type: object
result: {}
required:
- goal
title: project_order_report参数
type: object
type: UniLabJsonCommand
auto-query_resource_by_name:
feedback: {}
goal: {}
goal_default:
material_name: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
material_name:
type: string
required:
- material_name
type: object
result: {}
required:
- goal
title: query_resource_by_name参数
type: object
type: UniLabJsonCommand
auto-transfer_materials_to_reaction_station:
feedback: {}
goal: {}
goal_default:
target_device_id: null
transfer_groups: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
target_device_id:
type: string
transfer_groups:
type: array
required:
- target_device_id
- transfer_groups
type: object
result: {}
required:
- goal
title: transfer_materials_to_reaction_station参数
type: object
type: UniLabJsonCommand
auto-wait_for_multiple_orders_and_get_reports:
feedback: {}
goal: {}
@@ -144,6 +313,31 @@ workstation.bioyond_dispensing_station:
title: wait_for_multiple_orders_and_get_reports参数
type: object
type: UniLabJsonCommand
auto-workflow_sample_locations:
feedback: {}
goal: {}
goal_default:
workflow_id: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
workflow_id:
type: string
required:
- workflow_id
type: object
result: {}
required:
- goal
title: workflow_sample_locations参数
type: object
type: UniLabJsonCommand
create_90_10_vial_feeding_task:
feedback: {}
goal:

View File

@@ -5,6 +5,96 @@ bioyond_dispensing_station:
- bioyond_dispensing_station
class:
action_value_mappings:
auto-brief_step_parameters:
feedback: {}
goal: {}
goal_default:
data: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
data:
type: object
required:
- data
type: object
result: {}
required:
- goal
title: brief_step_parameters参数
type: object
type: UniLabJsonCommand
auto-compute_experiment_design:
feedback: {}
goal: {}
goal_default:
m_tot: '70'
ratio: null
titration_percent: '0.03'
wt_percent: '0.25'
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
m_tot:
default: '70'
type: string
ratio:
type: object
titration_percent:
default: '0.03'
type: string
wt_percent:
default: '0.25'
type: string
required:
- ratio
type: object
result:
properties:
feeding_order:
items: {}
title: Feeding Order
type: array
return_info:
title: Return Info
type: string
solutions:
items: {}
title: Solutions
type: array
solvents:
additionalProperties: true
title: Solvents
type: object
titration:
additionalProperties: true
title: Titration
type: object
required:
- solutions
- titration
- solvents
- feeding_order
- return_info
title: ComputeExperimentDesignReturn
type: object
required:
- goal
title: compute_experiment_design参数
type: object
type: UniLabJsonCommand
auto-process_order_finish_report:
feedback: {}
goal: {}
@@ -34,6 +124,110 @@ bioyond_dispensing_station:
title: process_order_finish_report参数
type: object
type: UniLabJsonCommand
auto-project_order_report:
feedback: {}
goal: {}
goal_default:
order_id: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
order_id:
type: string
required:
- order_id
type: object
result: {}
required:
- goal
title: project_order_report参数
type: object
type: UniLabJsonCommand
auto-query_resource_by_name:
feedback: {}
goal: {}
goal_default:
material_name: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
material_name:
type: string
required:
- material_name
type: object
result: {}
required:
- goal
title: query_resource_by_name参数
type: object
type: UniLabJsonCommand
auto-transfer_materials_to_reaction_station:
feedback: {}
goal: {}
goal_default:
target_device_id: null
transfer_groups: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
target_device_id:
type: string
transfer_groups:
type: array
required:
- target_device_id
- transfer_groups
type: object
result: {}
required:
- goal
title: transfer_materials_to_reaction_station参数
type: object
type: UniLabJsonCommand
auto-workflow_sample_locations:
feedback: {}
goal: {}
goal_default:
workflow_id: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
workflow_id:
type: string
required:
- workflow_id
type: object
result: {}
required:
- goal
title: workflow_sample_locations参数
type: object
type: UniLabJsonCommand
batch_create_90_10_vial_feeding_tasks:
feedback: {}
goal:

View File

@@ -61,6 +61,9 @@ camera:
device_id:
default: video_publisher
type: string
device_uuid:
default: ''
type: string
period:
default: 0.1
type: number

View File

@@ -4497,9 +4497,6 @@ liquid_handler:
simulator:
default: false
type: boolean
total_height:
default: 310
type: number
required:
- backend
- deck

View File

@@ -4,6 +4,215 @@ reaction_station.bioyond:
- reaction_station_bioyond
class:
action_value_mappings:
auto-create_order:
feedback: {}
goal: {}
goal_default:
json_str: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
json_str:
type: string
required:
- json_str
type: object
result: {}
required:
- goal
title: create_order参数
type: object
type: UniLabJsonCommand
auto-hard_delete_merged_workflows:
feedback: {}
goal: {}
goal_default:
workflow_ids: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
workflow_ids:
items:
type: string
type: array
required:
- workflow_ids
type: object
result: {}
required:
- goal
title: hard_delete_merged_workflows参数
type: object
type: UniLabJsonCommand
auto-merge_workflow_with_parameters:
feedback: {}
goal: {}
goal_default:
json_str: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
json_str:
type: string
required:
- json_str
type: object
result: {}
required:
- goal
title: merge_workflow_with_parameters参数
type: object
type: UniLabJsonCommand
auto-process_temperature_cutoff_report:
feedback: {}
goal: {}
goal_default:
report_request: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
report_request:
type: string
required:
- report_request
type: object
result: {}
required:
- goal
title: process_temperature_cutoff_report参数
type: object
type: UniLabJsonCommand
auto-process_web_workflows:
feedback: {}
goal: {}
goal_default:
web_workflow_json: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
web_workflow_json:
type: string
required:
- web_workflow_json
type: object
result: {}
required:
- goal
title: process_web_workflows参数
type: object
type: UniLabJsonCommand
auto-skip_titration_steps:
feedback: {}
goal: {}
goal_default:
preintake_id: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
preintake_id:
type: string
required:
- preintake_id
type: object
result: {}
required:
- goal
title: skip_titration_steps参数
type: object
type: UniLabJsonCommand
auto-wait_for_multiple_orders_and_get_reports:
feedback: {}
goal: {}
goal_default:
batch_create_result: null
check_interval: 10
timeout: 7200
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
batch_create_result:
type: string
check_interval:
default: 10
type: integer
timeout:
default: 7200
type: integer
required: []
type: object
result: {}
required:
- goal
title: wait_for_multiple_orders_and_get_reports参数
type: object
type: UniLabJsonCommand
auto-workflow_step_query:
feedback: {}
goal: {}
goal_default:
workflow_id: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
workflow_id:
type: string
required:
- workflow_id
type: object
result: {}
required:
- goal
title: workflow_step_query参数
type: object
type: UniLabJsonCommand
drip_back:
feedback: {}
goal:
@@ -553,19 +762,7 @@ reaction_station.bioyond:
module: unilabos.devices.workstation.bioyond_studio.reaction_station:BioyondReactionStation
protocol_type: []
status_types:
all_workflows: dict
average_viscosity: float
bioyond_status: dict
force: float
in_temperature: float
out_temperature: float
pt100_temperature: float
sensor_average_temperature: float
setting_temperature: float
speed: float
target_temperature: float
viscosity: float
workstation_status: dict
workflow_sequence: String
type: python
config_info: []
description: Bioyond反应站
@@ -577,21 +774,19 @@ reaction_station.bioyond:
config:
type: object
deck:
type: object
type: string
protocol_type:
type: string
required: []
type: object
data:
properties:
all_workflows:
type: object
bioyond_status:
type: object
workstation_status:
type: object
workflow_sequence:
items:
type: string
type: array
required:
- bioyond_status
- all_workflows
- workstation_status
- workflow_sequence
type: object
version: 1.0.0
reaction_station.reactor:
@@ -599,19 +794,34 @@ reaction_station.reactor:
- reactor
- reaction_station_bioyond
class:
action_value_mappings: {}
action_value_mappings:
auto-update_metrics:
feedback: {}
goal: {}
goal_default:
payload: null
handles: {}
placeholder_keys: {}
result: {}
schema:
description: ''
properties:
feedback: {}
goal:
properties:
payload:
type: object
required:
- payload
type: object
result: {}
required:
- goal
title: update_metrics参数
type: object
type: UniLabJsonCommand
module: unilabos.devices.workstation.bioyond_studio.reaction_station:BioyondReactor
status_types:
average_viscosity: float
force: float
in_temperature: float
out_temperature: float
pt100_temperature: float
sensor_average_temperature: float
setting_temperature: float
speed: float
target_temperature: float
viscosity: float
status_types: {}
type: python
config_info: []
description: 反应站子设备-反应器
@@ -622,30 +832,14 @@ reaction_station.reactor:
properties:
config:
type: object
deck:
type: string
protocol_type:
type: string
required: []
type: object
data:
properties:
average_viscosity:
type: number
force:
type: number
in_temperature:
type: number
out_temperature:
type: number
pt100_temperature:
type: number
sensor_average_temperature:
type: number
setting_temperature:
type: number
speed:
type: number
target_temperature:
type: number
viscosity:
type: number
properties: {}
required: []
type: object
version: 1.0.0

View File

@@ -6036,7 +6036,12 @@ workstation:
properties:
deck:
type: string
protocol_type:
items:
type: string
type: array
required:
- protocol_type
- deck
type: object
data:

View File

@@ -453,7 +453,7 @@ class Registry:
return status_schema
def _generate_unilab_json_command_schema(
self, method_args: List[Dict[str, Any]], method_name: str
self, method_args: List[Dict[str, Any]], method_name: str, return_annotation: Any = None
) -> Dict[str, Any]:
"""
根据UniLabJsonCommand方法信息生成JSON Schema暂不支持嵌套类型
@@ -461,6 +461,7 @@ class Registry:
Args:
method_args: 方法信息字典包含args等
method_name: 方法名称
return_annotation: 返回类型注解用于生成result schema仅支持TypedDict
Returns:
JSON Schema格式的参数schema
@@ -489,14 +490,68 @@ class Registry:
if param_required:
schema["required"].append(param_name)
# 生成result schema仅当return_annotation是TypedDict时
result_schema = {}
if return_annotation is not None and self._is_typed_dict(return_annotation):
result_schema = self._generate_typed_dict_result_schema(return_annotation)
return {
"title": f"{method_name}参数",
"description": f"",
"type": "object",
"properties": {"goal": schema, "feedback": {}, "result": {}},
"properties": {"goal": schema, "feedback": {}, "result": result_schema},
"required": ["goal"],
}
def _is_typed_dict(self, annotation: Any) -> bool:
"""
检查类型注解是否是TypedDict
Args:
annotation: 类型注解对象
Returns:
是否为TypedDict
"""
if annotation is None or annotation == inspect.Parameter.empty:
return False
# 使用 typing_extensions.is_typeddict 进行检查Python < 3.12 兼容)
try:
from typing_extensions import is_typeddict
return is_typeddict(annotation)
except ImportError:
# 回退方案:检查 TypedDict 特有的属性
if isinstance(annotation, type):
return hasattr(annotation, "__required_keys__") and hasattr(annotation, "__optional_keys__")
return False
def _generate_typed_dict_result_schema(self, return_annotation: Any) -> Dict[str, Any]:
"""
根据TypedDict类型生成result的JSON Schema
Args:
return_annotation: TypedDict类型注解
Returns:
JSON Schema格式的result schema
"""
if not self._is_typed_dict(return_annotation):
return {}
try:
from msgcenterpy.instances.typed_dict_instance import TypedDictMessageInstance
result_schema = TypedDictMessageInstance.get_json_schema_from_typed_dict(return_annotation)
return result_schema
except ImportError:
logger.warning("[UniLab Registry] msgcenterpy未安装无法生成TypedDict的result schema")
return {}
except Exception as e:
logger.warning(f"[UniLab Registry] 生成TypedDict result schema失败: {e}")
return {}
def _add_builtin_actions(self, device_config: Dict[str, Any], device_id: str):
"""
为设备配置添加内置的执行驱动命令动作
@@ -577,9 +632,15 @@ class Registry:
if "init_param_schema" not in device_config:
device_config["init_param_schema"] = {}
if "class" in device_config:
if "status_types" not in device_config["class"] or device_config["class"]["status_types"] is None:
if (
"status_types" not in device_config["class"]
or device_config["class"]["status_types"] is None
):
device_config["class"]["status_types"] = {}
if "action_value_mappings" not in device_config["class"] or device_config["class"]["action_value_mappings"] is None:
if (
"action_value_mappings" not in device_config["class"]
or device_config["class"]["action_value_mappings"] is None
):
device_config["class"]["action_value_mappings"] = {}
enhanced_info = {}
if complete_registry:
@@ -631,7 +692,9 @@ class Registry:
"goal": {},
"feedback": {},
"result": {},
"schema": self._generate_unilab_json_command_schema(v["args"], k),
"schema": self._generate_unilab_json_command_schema(
v["args"], k, v.get("return_annotation")
),
"goal_default": {i["name"]: i["default"] for i in v["args"]},
"handles": [],
"placeholder_keys": {

View File

@@ -2,7 +2,7 @@ container:
category:
- container
class:
module: unilabos.resources.container:RegularContainer
module: unilabos.resources.container:get_regular_container
type: pylabrobot
description: regular organic container
handles:

View File

@@ -22,6 +22,13 @@ class RegularContainer(Container):
def load_state(self, state: Dict[str, Any]):
self.state = state
def get_regular_container(name="container"):
r = RegularContainer(name=name)
r.category = "container"
return RegularContainer(name=name)
#
# class RegularContainer(object):
# # 第一个参数必须是id传入

View File

@@ -1144,7 +1144,7 @@ class BaseROS2DeviceNode(Node, Generic[T]):
queried_resources = []
for resource_data in resource_inputs:
plr_resource = await self.get_resource_with_dir(
resource_ids=resource_data["id"], with_children=True
resource_id=resource_data["id"], with_children=True
)
queried_resources.append(plr_resource)

View File

@@ -5,7 +5,7 @@ import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, Union
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, TypedDict, Union
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point
@@ -38,6 +38,7 @@ from unilabos.ros.msgs.message_converter import (
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.ros.nodes.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
@@ -48,7 +49,7 @@ from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem, WSResourceChatData
from unilabos.app.ws_client import QueueItem
@dataclass
@@ -56,6 +57,11 @@ class DeviceActionStatus:
job_ids: Dict[str, float] = field(default_factory=dict)
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[DeviceSlot]
class HostNode(BaseROS2DeviceNode):
"""
主机节点类,负责管理设备、资源和控制器
@@ -785,6 +791,16 @@ class HostNode(BaseROS2DeviceNode):
del self._goals[job_id]
self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals")
# 存储结果供 HTTP API 查询
try:
from unilabos.app.web.controller import store_job_result
if goal_status == GoalStatus.STATUS_CANCELED:
store_job_result(job_id, status, return_info, {})
else:
store_job_result(job_id, status, return_info, result_data)
except ImportError:
pass # controller 模块可能未加载
# 发布状态到桥接器
if job_id:
for bridge in self.bridges:
@@ -1347,7 +1363,7 @@ class HostNode(BaseROS2DeviceNode):
def test_resource(
self, resource: ResourceSlot, resources: List[ResourceSlot], device: DeviceSlot, devices: List[DeviceSlot]
):
) -> TestResourceReturn:
return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources]).dump(),
"devices": [device, *devices],

View File

@@ -5,6 +5,7 @@
import argparse
import importlib
import locale
import subprocess
import sys
@@ -22,13 +23,33 @@ class EnvironmentChecker:
"websockets": "websockets",
"msgcenterpy": "msgcenterpy",
"opentrons_shared_data": "opentrons_shared_data",
"typing_extensions": "typing_extensions",
}
# 特殊安装包(需要特殊处理的包)
self.special_packages = {"pylabrobot": "git+https://github.com/Xuwznln/pylabrobot.git"}
# 包版本要求(包名: 最低版本)
self.version_requirements = {
"msgcenterpy": "0.1.5", # msgcenterpy 最低版本要求
}
self.missing_packages = []
self.failed_installs = []
self.packages_need_upgrade = []
# 检测系统语言
self.is_chinese = self._is_chinese_locale()
def _is_chinese_locale(self) -> bool:
"""检测系统是否为中文环境"""
try:
lang = locale.getdefaultlocale()[0]
if lang and ("zh" in lang.lower() or "chinese" in lang.lower()):
return True
except Exception:
pass
return False
def check_package_installed(self, package_name: str) -> bool:
"""检查包是否已安装"""
@@ -38,31 +59,74 @@ class EnvironmentChecker:
except ImportError:
return False
def install_package(self, package_name: str, pip_name: str) -> bool:
def get_package_version(self, package_name: str) -> str | None:
"""获取已安装包的版本"""
try:
module = importlib.import_module(package_name)
return getattr(module, "__version__", None)
except (ImportError, AttributeError):
return None
def compare_version(self, current: str, required: str) -> bool:
"""
比较版本号
Returns:
True: current >= required
False: current < required
"""
try:
current_parts = [int(x) for x in current.split(".")]
required_parts = [int(x) for x in required.split(".")]
# 补齐长度
max_len = max(len(current_parts), len(required_parts))
current_parts.extend([0] * (max_len - len(current_parts)))
required_parts.extend([0] * (max_len - len(required_parts)))
return current_parts >= required_parts
except Exception:
return True # 如果无法比较,假设版本满足要求
def install_package(self, package_name: str, pip_name: str, upgrade: bool = False) -> bool:
"""安装包"""
try:
print_status(f"正在安装 {package_name} ({pip_name})...", "info")
action = "升级" if upgrade else "安装"
print_status(f"正在{action} {package_name} ({pip_name})...", "info")
# 构建安装命令
cmd = [sys.executable, "-m", "pip", "install", pip_name]
cmd = [sys.executable, "-m", "pip", "install"]
# 如果是升级操作,添加 --upgrade 参数
if upgrade:
cmd.append("--upgrade")
cmd.append(pip_name)
# 如果是中文环境,使用清华镜像源
if self.is_chinese:
cmd.extend(["-i", "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple"])
# 执行安装
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) # 5分钟超时
if result.returncode == 0:
print_status(f"{package_name} 安装成功", "success")
print_status(f"{package_name} {action}成功", "success")
return True
else:
print_status(f"× {package_name} 安装失败: {result.stderr}", "error")
print_status(f"× {package_name} {action}失败: {result.stderr}", "error")
return False
except subprocess.TimeoutExpired:
print_status(f"× {package_name} 安装超时", "error")
print_status(f"× {package_name} {action}超时", "error")
return False
except Exception as e:
print_status(f"× {package_name} 安装异常: {str(e)}", "error")
print_status(f"× {package_name} {action}异常: {str(e)}", "error")
return False
def upgrade_package(self, package_name: str, pip_name: str) -> bool:
"""升级包"""
return self.install_package(package_name, pip_name, upgrade=True)
def check_all_packages(self) -> bool:
"""检查所有必需的包"""
print_status("开始检查环境依赖...", "info")
@@ -71,60 +135,116 @@ class EnvironmentChecker:
for import_name, pip_name in self.required_packages.items():
if not self.check_package_installed(import_name):
self.missing_packages.append((import_name, pip_name))
else:
# 检查版本要求
if import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
if current_version:
if not self.compare_version(current_version, required_version):
print_status(
f"{import_name} 版本过低 (当前: {current_version}, 需要: >={required_version})",
"warning",
)
self.packages_need_upgrade.append((import_name, pip_name))
# 检查特殊包
for package_name, install_url in self.special_packages.items():
if not self.check_package_installed(package_name):
self.missing_packages.append((package_name, install_url))
if not self.missing_packages:
all_ok = not self.missing_packages and not self.packages_need_upgrade
if all_ok:
print_status("✓ 所有依赖包检查完成,环境正常", "success")
return True
print_status(f"发现 {len(self.missing_packages)} 个缺失的包", "warning")
if self.missing_packages:
print_status(f"发现 {len(self.missing_packages)} 个缺失的包", "warning")
if self.packages_need_upgrade:
print_status(f"发现 {len(self.packages_need_upgrade)} 个需要升级的包", "warning")
return False
def install_missing_packages(self, auto_install: bool = True) -> bool:
"""安装缺失的包"""
if not self.missing_packages:
if not self.missing_packages and not self.packages_need_upgrade:
return True
if not auto_install:
print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning")
if self.missing_packages:
print_status("缺失以下包:", "warning")
for import_name, pip_name in self.missing_packages:
print_status(f" - {import_name} (pip install {pip_name})", "warning")
if self.packages_need_upgrade:
print_status("需要升级以下包:", "warning")
for import_name, pip_name in self.packages_need_upgrade:
print_status(f" - {import_name} (pip install --upgrade {pip_name})", "warning")
return False
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info")
# 安装缺失的包
if self.missing_packages:
print_status(f"开始自动安装 {len(self.missing_packages)} 个缺失的包...", "info")
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
success_count = 0
for import_name, pip_name in self.missing_packages:
if self.install_package(import_name, pip_name):
success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功安装 {success_count}/{len(self.missing_packages)} 个包", "success")
# 升级需要更新的包
if self.packages_need_upgrade:
print_status(f"开始自动升级 {len(self.packages_need_upgrade)} 个包...", "info")
upgrade_success_count = 0
for import_name, pip_name in self.packages_need_upgrade:
if self.upgrade_package(import_name, pip_name):
upgrade_success_count += 1
else:
self.failed_installs.append((import_name, pip_name))
print_status(f"✓ 成功升级 {upgrade_success_count}/{len(self.packages_need_upgrade)} 个包", "success")
if self.failed_installs:
print_status(f"{len(self.failed_installs)} 个包安装失败:", "error")
print_status(f"{len(self.failed_installs)} 个包操作失败:", "error")
for import_name, pip_name in self.failed_installs:
print_status(f" - {import_name} (pip install {pip_name})", "error")
print_status(f" - {import_name} ({pip_name})", "error")
return False
print_status(f"✓ 成功安装 {success_count} 个包", "success")
return True
def verify_installation(self) -> bool:
"""验证安装结果"""
if not self.missing_packages:
if not self.missing_packages and not self.packages_need_upgrade:
return True
print_status("验证安装结果...", "info")
failed_verification = []
# 验证新安装的包
for import_name, pip_name in self.missing_packages:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
# 验证升级的包
for import_name, pip_name in self.packages_need_upgrade:
if not self.check_package_installed(import_name):
failed_verification.append((import_name, pip_name))
elif import_name in self.version_requirements:
current_version = self.get_package_version(import_name)
required_version = self.version_requirements[import_name]
if current_version and not self.compare_version(current_version, required_version):
failed_verification.append((import_name, pip_name))
print_status(
f" {import_name} 版本仍然过低 (当前: {current_version}, 需要: >={required_version})",
"error",
)
if failed_verification:
print_status(f"{len(failed_verification)} 个包验证失败:", "error")
for import_name, pip_name in failed_verification:

View File

@@ -239,8 +239,12 @@ class ImportManager:
cls = get_class(class_path)
class_name = cls.__name__
result = {"class_name": class_name, "init_params": self._analyze_method_signature(cls.__init__)["args"],
"status_methods": {}, "action_methods": {}}
result = {
"class_name": class_name,
"init_params": self._analyze_method_signature(cls.__init__)["args"],
"status_methods": {},
"action_methods": {},
}
# 分析类的所有成员
for name, method in cls.__dict__.items():
if name.startswith("_"):
@@ -374,6 +378,7 @@ class ImportManager:
"name": method.__name__,
"args": args,
"return_type": self._get_type_string(signature.return_annotation),
"return_annotation": signature.return_annotation, # 保留原始类型注解用于TypedDict等特殊处理
"is_async": inspect.iscoroutinefunction(method),
}

View File

@@ -124,11 +124,14 @@ class ColoredFormatter(logging.Formatter):
def _format_basic(self, record):
"""基本格式化,不包含颜色"""
datetime_str = datetime.fromtimestamp(record.created).strftime("%y-%m-%d [%H:%M:%S,%f")[:-3] + "]"
filename = os.path.basename(record.filename).rsplit(".", 1)[0] # 提取文件名(不含路径和扩展名)
filename = record.filename.replace(".py", "").split("\\")[-1] # 提取文件名(不含路径和扩展名)
if "/" in filename:
filename = filename.split("/")[-1]
module_path = f"{record.name}.{filename}"
func_line = f"{record.funcName}:{record.lineno}"
right_info = f" [{func_line}] [{module_path}]"
formatted_message = f"{datetime_str} [{record.levelname}] [{module_path}] [{func_line}]: {record.getMessage()}"
formatted_message = f"{datetime_str} [{record.levelname}] {record.getMessage()}{right_info}"
if record.exc_info:
exc_text = self.formatException(record.exc_info)
@@ -150,7 +153,7 @@ class ColoredFormatter(logging.Formatter):
# 配置日志处理器
def configure_logger(loglevel=None):
def configure_logger(loglevel=None, working_dir=None):
"""配置日志记录器
Args:
@@ -191,9 +194,30 @@ def configure_logger(loglevel=None):
# 添加处理器到根日志记录器
root_logger.addHandler(console_handler)
# 如果指定了工作目录,添加文件处理器
if working_dir is not None:
logs_dir = os.path.join(working_dir, "logs")
os.makedirs(logs_dir, exist_ok=True)
# 生成日志文件名:日期 时间.log
log_filename = datetime.now().strftime("%Y-%m-%d %H-%M-%S") + ".log"
log_filepath = os.path.join(logs_dir, log_filename)
# 创建文件处理器
file_handler = logging.FileHandler(log_filepath, encoding="utf-8")
file_handler.setLevel(root_logger.level)
# 使用不带颜色的格式化器
file_formatter = ColoredFormatter(use_colors=False)
file_handler.setFormatter(file_formatter)
root_logger.addHandler(file_handler)
logging.getLogger("asyncio").setLevel(logging.INFO)
logging.getLogger("urllib3").setLevel(logging.INFO)
# 配置日志系统
configure_logger()