Compare commits

..

4 Commits

Author SHA1 Message Date
Xuwznln
554bcade24 Support unilabos_samples key 2025-11-19 15:53:59 +08:00
ZiWei
a662c75de1 feat(bioyond): 添加测量小瓶仓库和更新仓库工厂函数参数 2025-11-19 14:26:12 +08:00
ZiWei
931614fe64 feat(bioyond_studio): 添加项目API接口支持及优化物料管理功能
添加通用项目API接口方法(_post_project_api, _delete_project_api)用于与LIMS系统交互
实现compute_experiment_design方法用于实验设计计算
新增brief_step_parameters等订单相关接口方法
优化物料转移逻辑,增加异步任务处理
扩展BioyondV1RPC类,添加批量物料操作、订单状态管理等功能
2025-11-19 14:26:10 +08:00
Xuwznln
d39662f65f Update oss config 2025-11-19 14:22:03 +08:00
15 changed files with 1551 additions and 592 deletions

View File

@@ -67,14 +67,6 @@ class WSConfig:
max_reconnect_attempts = 999 # 最大重连次数 max_reconnect_attempts = 999 # 最大重连次数
ping_interval = 30 # ping间隔 ping_interval = 30 # ping间隔
# OSS上传配置
class OSSUploadConfig:
api_host = "" # API主机地址
authorization = "" # 授权信息
init_endpoint = "" # 初始化端点
complete_endpoint = "" # 完成端点
max_retries = 3 # 最大重试次数
# HTTP配置 # HTTP配置
class HTTPConfig: class HTTPConfig:
remote_addr = "https://uni-lab.bohrium.com/api/v1" # 远程服务器地址 remote_addr = "https://uni-lab.bohrium.com/api/v1" # 远程服务器地址
@@ -294,19 +286,7 @@ HTTP 客户端配置用于与云端服务通信:
- UAT 环境:`https://uni-lab.uat.bohrium.com/api/v1` - UAT 环境:`https://uni-lab.uat.bohrium.com/api/v1`
- 本地环境:`http://127.0.0.1:48197/api/v1` - 本地环境:`http://127.0.0.1:48197/api/v1`
### 4. OSSUploadConfig - OSS 上传配置 ### 4. ROSConfig - ROS 配置
对象存储服务配置,用于文件上传功能:
| 参数 | 类型 | 默认值 | 说明 |
| ------------------- | ---- | ------ | -------------------- |
| `api_host` | str | `""` | OSS API 主机地址 |
| `authorization` | str | `""` | 授权认证信息 |
| `init_endpoint` | str | `""` | 上传初始化端点 |
| `complete_endpoint` | str | `""` | 上传完成端点 |
| `max_retries` | int | `3` | 上传失败最大重试次数 |
### 5. ROSConfig - ROS 配置
配置 ROS 消息转换器需要加载的模块: 配置 ROS 消息转换器需要加载的模块:

View File

@@ -5,6 +5,7 @@
## 概述 ## 概述
注册表Registry是 Uni-Lab 的设备配置系统,采用 YAML 格式定义设备的: 注册表Registry是 Uni-Lab 的设备配置系统,采用 YAML 格式定义设备的:
- 可用动作Actions - 可用动作Actions
- 状态类型Status Types - 状态类型Status Types
- 初始化参数Init Parameters - 初始化参数Init Parameters
@@ -33,7 +34,7 @@
### 核心字段说明 ### 核心字段说明
| 字段名 | 类型 | 需要手写 | 说明 | | 字段名 | 类型 | 需要手写 | 说明 |
| ----------------- | ------ | -------- | ----------------------------------- | | ----------------- | ------ | -------- | --------------------------------- |
| 设备标识符 | string | 是 | 设备的唯一名字,如 `mock_chiller` | | 设备标识符 | string | 是 | 设备的唯一名字,如 `mock_chiller` |
| class | object | 部分 | 设备的核心信息,必须配置 | | class | object | 部分 | 设备的核心信息,必须配置 |
| description | string | 否 | 设备描述,系统默认给空字符串 | | description | string | 否 | 设备描述,系统默认给空字符串 |
@@ -74,8 +75,8 @@ my_device:
goal: { ... } goal: { ... }
result: { ... } result: { ... }
description: "设备描述" description: '设备描述'
version: "1.0.0" version: '1.0.0'
category: category:
- device_category - device_category
handles: [] handles: []
@@ -106,6 +107,7 @@ my_device:
适合大多数场景,快速高效。 适合大多数场景,快速高效。
**步骤** **步骤**
1. 启动 Uni-Lab 1. 启动 Uni-Lab
2. 访问 Web 界面的"注册表编辑器" 2. 访问 Web 界面的"注册表编辑器"
3. 上传您的 Python 设备驱动文件 3. 上传您的 Python 设备驱动文件
@@ -125,6 +127,7 @@ unilab -g dev.json --complete_registry --registry_path ./my_registry
``` ```
系统会: 系统会:
1. 扫描 Python 类 1. 扫描 Python 类
2. 分析方法签名和类型 2. 分析方法签名和类型
3. 自动生成缺失的字段 3. 自动生成缺失的字段
@@ -186,6 +189,7 @@ my_device:
| ROS 动作类型 | 标准 ROS 动作 | goal_default 和 schema | | ROS 动作类型 | 标准 ROS 动作 | goal_default 和 schema |
**常用的 ROS 动作类型** **常用的 ROS 动作类型**
- `SendCmd`:发送简单命令 - `SendCmd`:发送简单命令
- `NavigateThroughPoses`:导航动作 - `NavigateThroughPoses`:导航动作
- `SingleJointPosition`:单关节位置控制 - `SingleJointPosition`:单关节位置控制
@@ -297,7 +301,7 @@ my_device:
### 识别规则 ### 识别规则
| Python 类型 | placeholder_keys 值 | 前端效果 | | Python 类型 | placeholder_keys 值 | 前端效果 |
|-----------|-------------------|---------| | -------------------- | -------------------- | -------------- |
| `ResourceSlot` | `unilabos_resources` | 单选资源下拉框 | | `ResourceSlot` | `unilabos_resources` | 单选资源下拉框 |
| `List[ResourceSlot]` | `unilabos_resources` | 多选资源下拉框 | | `List[ResourceSlot]` | `unilabos_resources` | 多选资源下拉框 |
| `DeviceSlot` | `unilabos_devices` | 单选设备下拉框 | | `DeviceSlot` | `unilabos_devices` | 单选设备下拉框 |
@@ -313,6 +317,7 @@ placeholder_keys:
``` ```
**前端渲染**: **前端渲染**:
``` ```
Source: [下拉选择框 ▼] Source: [下拉选择框 ▼]
├── plate_1 (96孔板) ├── plate_1 (96孔板)
@@ -329,6 +334,7 @@ placeholder_keys:
``` ```
**前端渲染**: **前端渲染**:
``` ```
Targets: [多选下拉框 ▼] Targets: [多选下拉框 ▼]
☑ plate_1 (96孔板) ☑ plate_1 (96孔板)
@@ -345,6 +351,7 @@ placeholder_keys:
``` ```
**前端渲染**: **前端渲染**:
``` ```
Pump: [下拉选择框 ▼] Pump: [下拉选择框 ▼]
├── pump_1 (注射泵A) ├── pump_1 (注射泵A)
@@ -360,6 +367,7 @@ placeholder_keys:
``` ```
**前端渲染**: **前端渲染**:
``` ```
Sync Devices: [多选下拉框 ▼] Sync Devices: [多选下拉框 ▼]
☑ heater_1 (加热器A) ☑ heater_1 (加热器A)
@@ -414,7 +422,7 @@ placeholder_keys:
### status_types ### status_types
系统会扫描你的 Python 类从状态方法propertyget_方法自动生成这部分 系统会扫描你的 Python 类从状态方法propertyget\_方法自动生成这部分
```yaml ```yaml
status_types: status_types:
@@ -424,6 +432,7 @@ status_types:
``` ```
**注意事项** **注意事项**
- 系统会查找所有 `get_` 开头的方法和 `@property` 装饰的属性 - 系统会查找所有 `get_` 开头的方法和 `@property` 装饰的属性
- 类型会自动转成相应的类型(如 `str``float``bool` - 类型会自动转成相应的类型(如 `str``float``bool`
- 如果类型是 `Any``None` 或未知的,默认使用 `String` - 如果类型是 `Any``None` 或未知的,默认使用 `String`
@@ -459,6 +468,7 @@ init_param_schema:
``` ```
**生成规则** **生成规则**
- `config` 部分:分析 `__init__` 方法的参数、类型和默认值 - `config` 部分:分析 `__init__` 方法的参数、类型和默认值
- `data` 部分:根据 `status_types` 生成前端显示用的类型定义 - `data` 部分:根据 `status_types` 生成前端显示用的类型定义
@@ -619,22 +629,22 @@ advanced_liquid_handler:
result: result:
success: success success: success
schema: schema:
description: "转移液体" description: '转移液体'
properties: properties:
goal: goal:
properties: properties:
source: source:
type: object type: object
description: "源容器" description: '源容器'
target: target:
type: object type: object
description: "目标容器" description: '目标容器'
volume: volume:
type: number type: number
description: "体积(μL)" description: '体积(μL)'
tip: tip:
type: object type: object
description: "枪头(可选)" description: '枪头(可选)'
required: required:
- source - source
- target - target
@@ -668,12 +678,12 @@ advanced_liquid_handler:
result: result:
success: success success: success
description: "高级液体处理工作站,支持多目标转移和设备协同" description: '高级液体处理工作站,支持多目标转移和设备协同'
version: "1.0.0" version: '1.0.0'
category: category:
- liquid_handling - liquid_handling
handles: [] handles: []
icon: "" icon: ''
``` ```
### 另一个完整示例:温度控制器 ### 另一个完整示例:温度控制器
@@ -895,6 +905,7 @@ cat unilabos/registry/devices/my_device.yaml
### 2. 验证 placeholder_keys ### 2. 验证 placeholder_keys
确认: 确认:
- ResourceSlot 参数有 `unilabos_resources` - ResourceSlot 参数有 `unilabos_resources`
- DeviceSlot 参数有 `unilabos_devices` - DeviceSlot 参数有 `unilabos_devices`
- List 类型被正确识别 - List 类型被正确识别
@@ -919,8 +930,10 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice"
### Q1: placeholder_keys 没有自动生成 ### Q1: placeholder_keys 没有自动生成
**检查**: **检查**:
1. 是否使用了`--complete_registry`参数? 1. 是否使用了`--complete_registry`参数?
2. 类型注解是否正确? 2. 类型注解是否正确?
```python ```python
# ✓ 正确 # ✓ 正确
def method(self, resource: ResourceSlot): def method(self, resource: ResourceSlot):
@@ -928,6 +941,7 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice"
# ✗ 错误(缺少类型注解) # ✗ 错误(缺少类型注解)
def method(self, resource): def method(self, resource):
``` ```
3. 是否正确导入? 3. 是否正确导入?
```python ```python
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
@@ -938,6 +952,7 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice"
**原因**: placeholder_keys 未正确配置 **原因**: placeholder_keys 未正确配置
**解决**: **解决**:
```yaml ```yaml
# 检查YAML中是否有 # 检查YAML中是否有
placeholder_keys: placeholder_keys:
@@ -947,6 +962,7 @@ placeholder_keys:
### Q3: 多选不工作 ### Q3: 多选不工作
**检查类型注解**: **检查类型注解**:
```python ```python
# ✓ 正确 - 会生成多选 # ✓ 正确 - 会生成多选
def method(self, resources: List[ResourceSlot]): def method(self, resources: List[ResourceSlot]):
@@ -960,6 +976,7 @@ def method(self, resources: ResourceSlot):
**说明**: 运行时会自动转换 **说明**: 运行时会自动转换
前端传递: 前端传递:
```json ```json
{ {
"resource": "plate_1" // 字符串ID "resource": "plate_1" // 字符串ID
@@ -967,6 +984,7 @@ def method(self, resources: ResourceSlot):
``` ```
运行时收到: 运行时收到:
```python ```python
resource.id # "plate_1" resource.id # "plate_1"
resource.name # "96孔板" resource.name # "96孔板"
@@ -977,6 +995,7 @@ resource.type # "resource"
### Q5: 设备加载不了 ### Q5: 设备加载不了
**检查**: **检查**:
1. 确认 `class.module` 路径是否正确 1. 确认 `class.module` 路径是否正确
2. 确认 Python 驱动类能否正常导入 2. 确认 Python 驱动类能否正常导入
3. 使用 yaml 验证器检查文件格式 3. 使用 yaml 验证器检查文件格式
@@ -985,6 +1004,7 @@ resource.type # "resource"
### Q6: 自动生成失败 ### Q6: 自动生成失败
**检查**: **检查**:
1. 确认类继承了正确的基类 1. 确认类继承了正确的基类
2. 确保状态方法的返回类型注解清晰 2. 确保状态方法的返回类型注解清晰
3. 检查类能否被动态导入 3. 检查类能否被动态导入
@@ -993,6 +1013,7 @@ resource.type # "resource"
### Q7: 前端显示问题 ### Q7: 前端显示问题
**解决步骤**: **解决步骤**:
1. 删除旧的 yaml 文件,用编辑器重新生成 1. 删除旧的 yaml 文件,用编辑器重新生成
2. 清除浏览器缓存,重新加载页面 2. 清除浏览器缓存,重新加载页面
3. 确认必需字段(如 `schema`)都存在 3. 确认必需字段(如 `schema`)都存在
@@ -1001,6 +1022,7 @@ resource.type # "resource"
### Q8: 动作执行出错 ### Q8: 动作执行出错
**检查**: **检查**:
1. 确认动作方法名符合规范(如 `execute_<action_name>` 1. 确认动作方法名符合规范(如 `execute_<action_name>`
2. 检查 `goal` 字段的参数映射是否正确 2. 检查 `goal` 字段的参数映射是否正确
3. 确认方法返回值格式符合 `result` 映射 3. 确认方法返回值格式符合 `result` 映射
@@ -1075,6 +1097,7 @@ def method(
``` ```
5. **方法命名规范** 5. **方法命名规范**
- 状态方法使用 `@property` 装饰器或 `get_` 前缀 - 状态方法使用 `@property` 装饰器或 `get_` 前缀
- 动作方法使用动词开头 - 动作方法使用动词开头
- 保持命名清晰、一致 - 保持命名清晰、一致
@@ -1114,5 +1137,3 @@ def method(
- Python [typing 模块](https://docs.python.org/3/library/typing.html) - Python [typing 模块](https://docs.python.org/3/library/typing.html)
- [YAML 语法](https://yaml.org/) - [YAML 语法](https://yaml.org/)
- [JSON Schema](https://json-schema.org/) - [JSON Schema](https://json-schema.org/)

View File

@@ -50,8 +50,6 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
self.client = tcp.register_node_list(self.nodes) self.client = tcp.register_node_list(self.nodes)
``` ```
## 2. 编写驱动与寄存器读写 ## 2. 编写驱动与寄存器读写
### 2.1 寄存器示例 ### 2.1 寄存器示例
@@ -95,8 +93,8 @@ def start_and_read_metrics(self):
完成工站类与驱动后,需要生成(或更新)工站注册表供系统识别。 完成工站类与驱动后,需要生成(或更新)工站注册表供系统识别。
### 3.1 新增工站设备(或资源)首次生成注册表 ### 3.1 新增工站设备(或资源)首次生成注册表
首先通过以下命令启动 unilab。进入 unilab 系统状态检查页面 首先通过以下命令启动 unilab。进入 unilab 系统状态检查页面
```bash ```bash
@@ -112,6 +110,7 @@ python unilabos\app\main.py -g celljson.json --ak <user的AK> --sk <user的SK>
![注册表生成流程](image_battery_plc/unilab_registry_process.png) ![注册表生成流程](image_battery_plc/unilab_registry_process.png)
步骤说明: 步骤说明:
1. 选择新增的工站`coin_cell_assembly.py`文件 1. 选择新增的工站`coin_cell_assembly.py`文件
2. 点击分析按钮,分析`coin_cell_assembly.py`文件 2. 点击分析按钮,分析`coin_cell_assembly.py`文件
3. 选择`coin_cell_assembly.py`文件中继承`WorkstationBase` 3. 选择`coin_cell_assembly.py`文件中继承`WorkstationBase`
@@ -124,20 +123,16 @@ python unilabos\app\main.py -g celljson.json --ak <user的AK> --sk <user的SK>
![生成的YAML文件](image_battery_plc/unilab_new_yaml.png) ![生成的YAML文件](image_battery_plc/unilab_new_yaml.png)
### 3.2 添加新生成注册表 ### 3.2 添加新生成注册表
`unilabos\registry\devices`目录下新建一个 yaml 文件,此处新建文件命名为`coincellassemblyworkstation_device.yaml`,将上面生成的新的注册表信息粘贴到`coincellassemblyworkstation_device.yaml`文件中。 `unilabos\registry\devices`目录下新建一个 yaml 文件,此处新建文件命名为`coincellassemblyworkstation_device.yaml`,将上面生成的新的注册表信息粘贴到`coincellassemblyworkstation_device.yaml`文件中。
在终端输入以下命令进行注册表补全操作。 在终端输入以下命令进行注册表补全操作。
```bash ```bash
python unilabos\app\register.py --complete_registry python unilabos\app\register.py --complete_registry
``` ```
### 3.3 启动并上传注册表 ### 3.3 启动并上传注册表
新增设备之后,启动 unilab 需要增加`--upload_registry`参数,来上传注册表信息。 新增设备之后,启动 unilab 需要增加`--upload_registry`参数,来上传注册表信息。
@@ -159,6 +154,7 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC
### 4.2 首次接入流程 ### 4.2 首次接入流程
首次新增设备(或资源)需要完整流程: 首次新增设备(或资源)需要完整流程:
1. ✅ 在网页端生成注册表信息 1. ✅ 在网页端生成注册表信息
2. ✅ 使用 `--complete_registry` 补全注册表 2. ✅ 使用 `--complete_registry` 补全注册表
3. ✅ 使用 `--upload_registry` 上传注册表信息 3. ✅ 使用 `--upload_registry` 上传注册表信息
@@ -166,6 +162,7 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC
### 4.3 驱动更新流程 ### 4.3 驱动更新流程
如果不是新增设备,仅修改了工站驱动的 `.py` 文件: 如果不是新增设备,仅修改了工站驱动的 `.py` 文件:
1. ✅ 运行 `--complete_registry` 补全注册表 1. ✅ 运行 `--complete_registry` 补全注册表
2. ✅ 运行 `--upload_registry` 上传注册表 2. ✅ 运行 `--upload_registry` 上传注册表
3. ❌ 不需要在网页端重新生成注册表 3. ❌ 不需要在网页端重新生成注册表
@@ -203,5 +200,3 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC
5. ✅ 新增设备与更新驱动的区别 5. ✅ 新增设备与更新驱动的区别
这个案例展示了完整的 PLC 设备接入流程,可以作为其他类似设备接入的参考模板。 这个案例展示了完整的 PLC 设备接入流程,可以作为其他类似设备接入的参考模板。

View File

@@ -592,7 +592,7 @@ class PLCWorkstation(WorkstationBase):
### 8.1 WorkstationBase 核心属性 ### 8.1 WorkstationBase 核心属性
| 属性 | 类型 | 说明 | | 属性 | 类型 | 说明 |
| --------------------------- | ----------------------- | ----------------------------- | | ------------------------- | ----------------------- | ------------------------------- |
| `_ros_node` | ROS2WorkstationNode | ROS 节点引用,由 post_init 设置 | | `_ros_node` | ROS2WorkstationNode | ROS 节点引用,由 post_init 设置 |
| `deck` | Deck | PyLabRobot Deck本地物料系统 | | `deck` | Deck | PyLabRobot Deck本地物料系统 |
| `plr_resources` | Dict[str, PLRResource] | 物料资源映射 | | `plr_resources` | Dict[str, PLRResource] | 物料资源映射 |

View File

@@ -592,4 +592,3 @@ ros2 topic list
- [ROS2 网络配置](https://docs.ros.org/en/humble/Tutorials/Advanced/Networking.html) - [ROS2 网络配置](https://docs.ros.org/en/humble/Tutorials/Advanced/Networking.html)
- [DDS 配置](https://fast-dds.docs.eprosima.com/) - [DDS 配置](https://fast-dds.docs.eprosima.com/)
- Uni-Lab 云平台文档 - Uni-Lab 云平台文档

View File

@@ -1,161 +1,158 @@
import argparse import argparse
import os import os
import time import time
from typing import Dict, Optional, Tuple from datetime import datetime
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
import requests import requests
from unilabos.config.config import OSSUploadConfig from unilabos.app.web.client import http_client, HTTPClient
from unilabos.utils import logger
def _init_upload(file_path: str, oss_path: str, filename: Optional[str] = None, def _get_oss_token(
process_key: str = "file-upload", device_id: str = "default", filename: str,
expires_hours: int = 1) -> Tuple[bool, Dict]: driver_name: str = "default",
exp_type: str = "default",
client: Optional[HTTPClient] = None,
) -> Tuple[bool, Dict]:
""" """
初始化上传过程 获取OSS上传Token
Args: Args:
file_path: 本地文件路径 filename: 文件名
oss_path: OSS目标路径 driver_name: 驱动名称
filename: 文件名如果为None则使用file_path的文件名 exp_type: 实验类型
process_key: 处理键 client: HTTPClient实例如果不提供则使用默认的http_client
device_id: 设备ID
expires_hours: 链接过期小时数
Returns: Returns:
(成功标志, 响应数据) (成功标志, Token数据字典包含token/path/host/expires)
""" """
if filename is None: # 使用提供的client或默认的http_client
filename = os.path.basename(file_path) if client is None:
client = http_client
# 构造初始化请求 # 构造scene参数: driver_name-exp_type
url = f"{OSSUploadConfig.api_host}{OSSUploadConfig.init_endpoint}" scene = f"{driver_name}-{exp_type}"
headers = {
"Authorization": OSSUploadConfig.authorization,
"Content-Type": "application/json"
}
payload = { # 构造请求URL使用client的remote_addr已包含/api/v1/
"device_id": device_id, url = f"{client.remote_addr}/applications/token"
"process_key": process_key, params = {"scene": scene, "filename": filename}
"filename": filename,
"path": oss_path,
"expires_hours": expires_hours
}
try: try:
response = requests.post(url, headers=headers, json=payload) logger.info(f"[OSS] 请求预签名URL: scene={scene}, filename={filename}")
if response.status_code == 201: response = requests.get(url, params=params, headers={"Authorization": f"Lab {client.auth}"}, timeout=10)
result = response.json()
if result.get("code") == "10000":
return True, result.get("data", {})
print(f"初始化上传失败: {response.status_code}, {response.text}") if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
# 转换expires时间戳为可读格式
expires_timestamp = data.get("expires", 0)
expires_datetime = datetime.fromtimestamp(expires_timestamp)
expires_str = expires_datetime.strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"[OSS] 获取预签名URL成功")
logger.info(f"[OSS] - URL: {data.get('url', 'N/A')}")
logger.info(f"[OSS] - Expires: {expires_str} (timestamp: {expires_timestamp})")
return True, data
logger.error(f"[OSS] 获取预签名URL失败: {response.status_code}, {response.text}")
return False, {} return False, {}
except Exception as e: except Exception as e:
print(f"初始化上传异常: {str(e)}") logger.error(f"[OSS] 获取预签名URL异常: {str(e)}")
return False, {} return False, {}
def _put_upload(file_path: str, upload_url: str) -> bool: def _put_upload(file_path: str, upload_url: str) -> bool:
""" """
执行PUT上传 使用预签名URL上传文件到OSS
Args: Args:
file_path: 本地文件路径 file_path: 本地文件路径
upload_url: 上传URL upload_url: 完整的预签名上传URL
Returns: Returns:
是否成功 是否成功
""" """
try: try:
logger.info(f"[OSS] 开始上传文件: {file_path}")
with open(file_path, "rb") as f: with open(file_path, "rb") as f:
response = requests.put(upload_url, data=f) # 使用预签名URL上传不需要额外的认证header
response = requests.put(upload_url, data=f, timeout=300)
if response.status_code == 200: if response.status_code == 200:
logger.info(f"[OSS] 文件上传成功")
return True return True
print(f"PUT上传失败: {response.status_code}, {response.text}") logger.error(f"[OSS] 上传失败: {response.status_code}")
logger.error(f"[OSS] 响应内容: {response.text[:500] if response.text else '无响应内容'}")
return False return False
except Exception as e: except Exception as e:
print(f"PUT上传异常: {str(e)}") logger.error(f"[OSS] 上传异常: {str(e)}")
return False return False
def _complete_upload(uuid: str) -> bool: def oss_upload(
""" file_path: Union[str, Path],
完成上传过程 filename: Optional[str] = None,
driver_name: str = "default",
Args: exp_type: str = "default",
uuid: 上传的UUID max_retries: int = 3,
client: Optional[HTTPClient] = None,
Returns: ) -> Dict:
是否成功
"""
url = f"{OSSUploadConfig.api_host}{OSSUploadConfig.complete_endpoint}"
headers = {
"Authorization": OSSUploadConfig.authorization,
"Content-Type": "application/json"
}
payload = {
"uuid": uuid
}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
if result.get("code") == "10000":
return True
print(f"完成上传失败: {response.status_code}, {response.text}")
return False
except Exception as e:
print(f"完成上传异常: {str(e)}")
return False
def oss_upload(file_path: str, oss_path: str, filename: Optional[str] = None,
process_key: str = "file-upload", device_id: str = "default") -> bool:
""" """
文件上传主函数,包含重试机制 文件上传主函数,包含重试机制
Args: Args:
file_path: 本地文件路径 file_path: 本地文件路径
oss_path: OSS目标路径
filename: 文件名如果为None则使用file_path的文件名 filename: 文件名如果为None则使用file_path的文件名
process_key: 处理键 driver_name: 驱动名称用于构造scene
device_id: 设备ID exp_type: 实验类型用于构造scene
max_retries: 最大重试次数
client: HTTPClient实例如果不提供则使用默认的http_client
Returns: Returns:
是否成功上传 Dict: {
"success": bool, # 是否上传成功
"original_path": str, # 原始文件路径
"oss_path": str # OSS路径成功时或空字符串失败时
}
""" """
max_retries = OSSUploadConfig.max_retries file_path = Path(file_path)
if filename is None:
filename = os.path.basename(file_path)
if not os.path.exists(file_path):
logger.error(f"[OSS] 文件不存在: {file_path}")
return {"success": False, "original_path": file_path, "oss_path": ""}
retry_count = 0 retry_count = 0
oss_path = ""
while retry_count < max_retries: while retry_count < max_retries:
try: try:
# 步骤1初始化上传 # 步骤1获取预签名URL
init_success, init_data = _init_upload( token_success, token_data = _get_oss_token(
file_path=file_path, filename=filename, driver_name=driver_name, exp_type=exp_type, client=client
oss_path=oss_path,
filename=filename,
process_key=process_key,
device_id=device_id
) )
if not init_success: if not token_success:
print(f"初始化上传失败,重试 {retry_count + 1}/{max_retries}") logger.warning(f"[OSS] 获取预签名URL失败,重试 {retry_count + 1}/{max_retries}")
retry_count += 1 retry_count += 1
time.sleep(1) # 等待1秒后重试 time.sleep(1)
continue continue
# 获取UUID和上传URL # 获取预签名URL和OSS路径
uuid = init_data.get("uuid") upload_url = token_data.get("url")
upload_url = init_data.get("upload_url") oss_path = token_data.get("path", "")
if not uuid or not upload_url: if not upload_url:
print(f"初始化上传返回数据不完整,重试 {retry_count + 1}/{max_retries}") logger.warning(f"[OSS] 无法获取上传URLAPI未返回url字段")
retry_count += 1 retry_count += 1
time.sleep(1) time.sleep(1)
continue continue
@@ -163,69 +160,82 @@ def oss_upload(file_path: str, oss_path: str, filename: Optional[str] = None,
# 步骤2PUT上传文件 # 步骤2PUT上传文件
put_success = _put_upload(file_path, upload_url) put_success = _put_upload(file_path, upload_url)
if not put_success: if not put_success:
print(f"PUT上传失败重试 {retry_count + 1}/{max_retries}") logger.warning(f"[OSS] PUT上传失败重试 {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(1)
continue
# 步骤3完成上传
complete_success = _complete_upload(uuid)
if not complete_success:
print(f"完成上传失败,重试 {retry_count + 1}/{max_retries}")
retry_count += 1 retry_count += 1
time.sleep(1) time.sleep(1)
continue continue
# 所有步骤都成功 # 所有步骤都成功
print(f"文件 {file_path} 上传成功") logger.info(f"[OSS] 文件 {file_path} 上传成功")
return True return {"success": True, "original_path": file_path, "oss_path": oss_path}
except Exception as e: except Exception as e:
print(f"上传过程异常: {str(e)},重试 {retry_count + 1}/{max_retries}") logger.error(f"[OSS] 上传过程异常: {str(e)},重试 {retry_count + 1}/{max_retries}")
retry_count += 1 retry_count += 1
time.sleep(1) time.sleep(1)
print(f"文件 {file_path} 上传失败,已达到最大重试次数 {max_retries}") logger.error(f"[OSS] 文件 {file_path} 上传失败,已达到最大重试次数 {max_retries}")
return False return {"success": False, "original_path": file_path, "oss_path": oss_path}
if __name__ == "__main__": if __name__ == "__main__":
# python -m unilabos.app.oss_upload -f /path/to/your/file.txt # python -m unilabos.app.oss_upload -f /path/to/your/file.txt --driver HPLC --type test
# python -m unilabos.app.oss_upload -f /path/to/your/file.txt --driver HPLC --type test \
# --ak xxx --sk yyy --remote-addr http://xxx/api/v1
# 命令行参数解析 # 命令行参数解析
parser = argparse.ArgumentParser(description='文件上传测试工具') parser = argparse.ArgumentParser(description="文件上传测试工具")
parser.add_argument('--file', '-f', type=str, required=True, help='要上传的本地文件路径') parser.add_argument("--file", "-f", type=str, required=True, help="要上传的本地文件路径")
parser.add_argument('--path', '-p', type=str, default='/HPLC1/Any', help='OSS目标路径') parser.add_argument("--driver", "-d", type=str, default="default", help="驱动名称")
parser.add_argument('--device', '-d', type=str, default='test-device', help='设备ID') parser.add_argument("--type", "-t", type=str, default="default", help="实验类型")
parser.add_argument('--process', '-k', type=str, default='HPLC-txt-result', help='处理键') parser.add_argument("--ak", type=str, help="Access Key如果提供则覆盖配置")
parser.add_argument("--sk", type=str, help="Secret Key如果提供则覆盖配置")
parser.add_argument("--remote-addr", type=str, help="远程服务器地址(包含/api/v1如果提供则覆盖配置")
args = parser.parse_args() args = parser.parse_args()
# 检查文件是否存在 # 检查文件是否存在
if not os.path.exists(args.file): if not os.path.exists(args.file):
print(f"错误:文件 {args.file} 不存在") logger.error(f"错误:文件 {args.file} 不存在")
exit(1) exit(1)
print("=" * 50) # 如果提供了ak/sk/remote_addr创建临时HTTPClient
print(f"开始上传文件: {args.file}") temp_client = None
print(f"目标路径: {args.path}") if args.ak and args.sk:
print(f"设备ID: {args.device}") import base64
print(f"处理键: {args.process}")
print("=" * 50) auth = base64.b64encode(f"{args.ak}:{args.sk}".encode("utf-8")).decode("utf-8")
remote_addr = args.remote_addr if args.remote_addr else http_client.remote_addr
temp_client = HTTPClient(remote_addr=remote_addr, auth=auth)
logger.info(f"[配置] 使用自定义配置: remote_addr={remote_addr}")
elif args.remote_addr:
temp_client = HTTPClient(remote_addr=args.remote_addr, auth=http_client.auth)
logger.info(f"[配置] 使用自定义remote_addr: {args.remote_addr}")
else:
logger.info(f"[配置] 使用默认配置: remote_addr={http_client.remote_addr}")
logger.info("=" * 50)
logger.info(f"开始上传文件: {args.file}")
logger.info(f"驱动名称: {args.driver}")
logger.info(f"实验类型: {args.type}")
logger.info(f"Scene: {args.driver}-{args.type}")
logger.info("=" * 50)
# 执行上传 # 执行上传
success = oss_upload( result = oss_upload(
file_path=args.file, file_path=args.file,
oss_path=args.path,
filename=None, # 使用默认文件名 filename=None, # 使用默认文件名
process_key=args.process, driver_name=args.driver,
device_id=args.device exp_type=args.type,
client=temp_client,
) )
# 输出结果 # 输出结果
if success: if result["success"]:
print("\n√ 文件上传成功!") logger.info(f"\n√ 文件上传成功!")
logger.info(f"原始路径: {result['original_path']}")
logger.info(f"OSS路径: {result['oss_path']}")
exit(0) exit(0)
else: else:
print("\n× 文件上传失败!") logger.error(f"\n× 文件上传失败!")
logger.error(f"原始路径: {result['original_path']}")
exit(1) exit(1)

View File

@@ -39,15 +39,6 @@ class WSConfig:
ping_interval = 30 # ping间隔 ping_interval = 30 # ping间隔
# OSS上传配置
class OSSUploadConfig:
api_host = ""
authorization = ""
init_endpoint = ""
complete_endpoint = ""
max_retries = 3
# HTTP配置 # HTTP配置
class HTTPConfig: class HTTPConfig:
remote_addr = "http://127.0.0.1:48197/api/v1" remote_addr = "http://127.0.0.1:48197/api/v1"

View File

@@ -405,9 +405,19 @@ class RunningResultChecker(DriverChecker):
for i in range(self.driver._finished, temp): for i in range(self.driver._finished, temp):
sample_id = self.driver._get_resource_sample_id(self.driver._wf_name, i) # 从0开始计数 sample_id = self.driver._get_resource_sample_id(self.driver._wf_name, i) # 从0开始计数
pdf, txt = self.driver.get_data_file(i + 1) pdf, txt = self.driver.get_data_file(i + 1)
device_id = self.driver.device_id if hasattr(self.driver, "device_id") else "default" # 使用新的OSS上传接口传入driver_name和exp_type
oss_upload(pdf, f"hplc/{sample_id}/{os.path.basename(pdf)}", process_key="example", device_id=device_id) pdf_result = oss_upload(pdf, filename=os.path.basename(pdf), driver_name="HPLC", exp_type="analysis")
oss_upload(txt, f"hplc/{sample_id}/{os.path.basename(txt)}", process_key="HPLC-txt-result", device_id=device_id) txt_result = oss_upload(txt, filename=os.path.basename(txt), driver_name="HPLC", exp_type="result")
if pdf_result["success"]:
print(f"PDF上传成功: {pdf_result['oss_path']}")
else:
print(f"PDF上传失败: {pdf_result['original_path']}")
if txt_result["success"]:
print(f"TXT上传成功: {txt_result['oss_path']}")
else:
print(f"TXT上传失败: {txt_result['original_path']}")
# self.driver.extract_data_from_txt() # self.driver.extract_data_from_txt()
except Exception as ex: except Exception as ex:
self.driver._finished = 0 self.driver._finished = 0
@@ -456,8 +466,12 @@ if __name__ == "__main__":
} }
sample_id = obj._get_resource_sample_id("test", 0) sample_id = obj._get_resource_sample_id("test", 0)
pdf, txt = obj.get_data_file("1", after_time=datetime(2024, 11, 6, 19, 3, 6)) pdf, txt = obj.get_data_file("1", after_time=datetime(2024, 11, 6, 19, 3, 6))
oss_upload(pdf, f"hplc/{sample_id}/{os.path.basename(pdf)}", process_key="example") # 使用新的OSS上传接口传入driver_name和exp_type
oss_upload(txt, f"hplc/{sample_id}/{os.path.basename(txt)}", process_key="HPLC-txt-result") pdf_result = oss_upload(pdf, filename=os.path.basename(pdf), driver_name="HPLC", exp_type="analysis")
txt_result = oss_upload(txt, filename=os.path.basename(txt), driver_name="HPLC", exp_type="result")
print(f"PDF上传结果: {pdf_result}")
print(f"TXT上传结果: {txt_result}")
# driver = HPLCDriver() # driver = HPLCDriver()
# for i in range(10000): # for i in range(10000):
# print({k: v for k, v in driver._device_status.items() if isinstance(v, str)}) # print({k: v for k, v in driver._device_status.items() if isinstance(v, str)})

View File

@@ -192,6 +192,23 @@ class BioyondV1RPC(BaseRequest):
return [] return []
return str(response.get("data", {})) return str(response.get("data", {}))
def material_type_list(self) -> list:
"""查询物料类型列表
返回值:
list: 物料类型数组,失败返回空列表
"""
response = self.post(
url=f'{self.host}/api/lims/storage/material-type-list',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": {},
})
if not response or response['code'] != 1:
return []
return response.get("data", [])
def material_inbound(self, material_id: str, location_id: str) -> dict: def material_inbound(self, material_id: str, location_id: str) -> dict:
""" """
描述:指定库位入库一个物料 描述:指定库位入库一个物料
@@ -221,6 +238,26 @@ class BioyondV1RPC(BaseRequest):
# 入库成功时,即使没有 data 字段,也返回成功标识 # 入库成功时,即使没有 data 字段,也返回成功标识
return response.get("data") or {"success": True} return response.get("data") or {"success": True}
def batch_inbound(self, inbound_items: List[Dict[str, Any]]) -> int:
"""批量入库物料
参数:
inbound_items: 入库条目列表,每项包含 materialId/locationId/quantity 等
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/storage/batch-inbound',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": inbound_items,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def delete_material(self, material_id: str) -> dict: def delete_material(self, material_id: str) -> dict:
""" """
描述:删除尚未入库的物料 描述:删除尚未入库的物料
@@ -289,6 +326,66 @@ class BioyondV1RPC(BaseRequest):
return None return None
return response return response
def batch_outbound(self, outbound_items: List[Dict[str, Any]]) -> int:
"""批量出库物料
参数:
outbound_items: 出库条目列表,每项包含 materialId/locationId/quantity 等
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/storage/batch-outbound',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": outbound_items,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def material_info(self, material_id: str) -> dict:
"""查询物料详情
参数:
material_id: 物料ID
返回值:
dict: 物料信息字典,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/storage/material-info',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": material_id,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def reset_location(self, location_id: str) -> int:
"""复位库位
参数:
location_id: 库位ID
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/storage/reset-location',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": location_id,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
# ==================== 工作流查询相关接口 ==================== # ==================== 工作流查询相关接口 ====================
def query_workflow(self, json_str: str) -> dict: def query_workflow(self, json_str: str) -> dict:
@@ -332,6 +429,66 @@ class BioyondV1RPC(BaseRequest):
return {} return {}
return response.get("data", {}) return response.get("data", {})
def split_workflow_list(self, params: Dict[str, Any]) -> dict:
"""查询可拆分工作流列表
参数:
params: 查询条件参数
返回值:
dict: 返回数据字典,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/workflow/split-workflow-list',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": params,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def merge_workflow(self, data: Dict[str, Any]) -> dict:
"""合并工作流(无参数版)
参数:
data: 合并请求体,包含待合并的子工作流信息
返回值:
dict: 合并结果,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/workflow/merge-workflow',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": data,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def merge_workflow_with_parameters(self, data: Dict[str, Any]) -> dict:
"""合并工作流(携带参数)
参数:
data: 合并请求体,包含 name、workflows 以及 stepParameters 等
返回值:
dict: 合并结果,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/workflow/merge-workflow-with-parameters',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": data,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def validate_workflow_parameters(self, workflows: List[Dict[str, Any]]) -> Dict[str, Any]: def validate_workflow_parameters(self, workflows: List[Dict[str, Any]]) -> Dict[str, Any]:
"""验证工作流参数格式""" """验证工作流参数格式"""
try: try:
@@ -494,35 +651,34 @@ class BioyondV1RPC(BaseRequest):
return {} return {}
return response.get("data", {}) return response.get("data", {})
def order_report(self, json_str: str) -> dict: def order_report(self, order_id: str) -> dict:
""" """查询订单报告
描述:查询某个任务明细
json_str 格式为JSON字符串:
'{"order_id": "order123"}'
"""
try:
data = json.loads(json_str)
order_id = data.get("order_id", "")
except json.JSONDecodeError:
return {}
参数:
order_id: 订单ID
返回值:
dict: 报告数据,失败返回空字典
"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/order/project-order-report', url=f'{self.host}/api/lims/order/order-report',
params={ params={
"apiKey": self.api_key, "apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(), "requestTime": self.get_current_time_iso8601(),
"data": order_id, "data": order_id,
}) })
if not response or response['code'] != 1: if not response or response['code'] != 1:
return {} return {}
return response.get("data", {}) return response.get("data", {})
def order_takeout(self, json_str: str) -> int: def order_takeout(self, json_str: str) -> int:
""" """取出任务产物
描述:取出任务产物
json_str 格式为JSON字符串: 参数:
'{"order_id": "order123", "preintake_id": "preintake123"}' json_str: JSON字符串包含 order_id 与 preintake_id
返回值:
int: 成功返回1失败返回0
""" """
try: try:
data = json.loads(json_str) data = json.loads(json_str)
@@ -545,14 +701,15 @@ class BioyondV1RPC(BaseRequest):
return 0 return 0
return response.get("code", 0) return response.get("code", 0)
def sample_waste_removal(self, order_id: str) -> dict: def sample_waste_removal(self, order_id: str) -> dict:
""" """样品/废料取出
样品/废料取出接口
参数: 参数:
- order_id: 订单ID order_id: 订单ID
返回: 取出结果 返回:
dict: 取出结果,失败返回空字典
""" """
params = {"orderId": order_id} params = {"orderId": order_id}
@@ -574,10 +731,13 @@ class BioyondV1RPC(BaseRequest):
return response.get("data", {}) return response.get("data", {})
def cancel_order(self, json_str: str) -> bool: def cancel_order(self, json_str: str) -> bool:
""" """取消指定任务
描述:取消指定任务
json_str 格式为JSON字符串: 参数:
'{"order_id": "order123"}' json_str: JSON字符串包含 order_id
返回值:
bool: 成功返回 True失败返回 False
""" """
try: try:
data = json.loads(json_str) data = json.loads(json_str)
@@ -597,6 +757,126 @@ class BioyondV1RPC(BaseRequest):
return False return False
return True return True
def cancel_experiment(self, order_id: str) -> int:
"""取消指定实验
参数:
order_id: 订单ID
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/order/cancel-experiment',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def batch_cancel_experiment(self, order_ids: List[str]) -> int:
"""批量取消实验
参数:
order_ids: 订单ID列表
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/order/batch-cancel-experiment',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_ids,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def gantts_by_order_id(self, order_id: str) -> dict:
"""查询订单甘特图数据
参数:
order_id: 订单ID
返回值:
dict: 甘特数据,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/order/gantts-by-order-id',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def simulation_gantt_by_order_id(self, order_id: str) -> dict:
"""查询订单模拟甘特图数据
参数:
order_id: 订单ID
返回值:
dict: 模拟甘特数据,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/order/simulation-gantt-by-order-id',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
def reset_order_status(self, order_id: str) -> int:
"""复位订单状态
参数:
order_id: 订单ID
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/order/reset-order-status',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def gantt_with_simulation_by_order_id(self, order_id: str) -> dict:
"""查询订单甘特与模拟联合数据
参数:
order_id: 订单ID
返回值:
dict: 联合数据,失败返回空字典
"""
response = self.post(
url=f'{self.host}/api/lims/order/gantt-with-simulation-by-order-id',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": order_id,
})
if not response or response['code'] != 1:
return {}
return response.get("data", {})
# ==================== 设备管理相关接口 ==================== # ==================== 设备管理相关接口 ====================
def device_list(self, json_str: str = "") -> list: def device_list(self, json_str: str = "") -> list:
@@ -628,9 +908,13 @@ class BioyondV1RPC(BaseRequest):
return response.get("data", []) return response.get("data", [])
def device_operation(self, json_str: str) -> int: def device_operation(self, json_str: str) -> int:
""" """设备操作
描述:操作设备
json_str 格式为JSON字符串 参数:
json_str: JSON字符串包含 device_no/operationType/operationParams
返回值:
int: 成功返回1失败返回0
""" """
try: try:
data = json.loads(json_str) data = json.loads(json_str)
@@ -643,7 +927,7 @@ class BioyondV1RPC(BaseRequest):
return 0 return 0
response = self.post( response = self.post(
url=f'{self.host}/api/lims/device/device-operation', url=f'{self.host}/api/lims/device/execute-operation',
params={ params={
"apiKey": self.api_key, "apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(), "requestTime": self.get_current_time_iso8601(),
@@ -654,9 +938,30 @@ class BioyondV1RPC(BaseRequest):
return 0 return 0
return response.get("code", 0) return response.get("code", 0)
def reset_devices(self) -> int:
"""复位设备集合
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/device/reset-devices',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
# ==================== 调度器相关接口 ==================== # ==================== 调度器相关接口 ====================
def scheduler_status(self) -> dict: def scheduler_status(self) -> dict:
"""查询调度器状态
返回值:
dict: 包含 schedulerStatus/hasTask/creationTime 等
"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/scheduler/scheduler-status', url=f'{self.host}/api/lims/scheduler/scheduler-status',
params={ params={
@@ -669,7 +974,7 @@ class BioyondV1RPC(BaseRequest):
return response.get("data", {}) return response.get("data", {})
def scheduler_start(self) -> int: def scheduler_start(self) -> int:
"""描述:启动调度器""" """启动调度器"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/scheduler/start', url=f'{self.host}/api/lims/scheduler/start',
params={ params={
@@ -682,7 +987,7 @@ class BioyondV1RPC(BaseRequest):
return response.get("code", 0) return response.get("code", 0)
def scheduler_pause(self) -> int: def scheduler_pause(self) -> int:
"""描述:暂停调度器""" """暂停调度器"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/scheduler/pause', url=f'{self.host}/api/lims/scheduler/pause',
params={ params={
@@ -694,8 +999,21 @@ class BioyondV1RPC(BaseRequest):
return 0 return 0
return response.get("code", 0) return response.get("code", 0)
def scheduler_smart_pause(self) -> int:
"""智能暂停调度器"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/smart-pause',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
def scheduler_continue(self) -> int: def scheduler_continue(self) -> int:
"""描述:继续调度器""" """继续调度器"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/scheduler/continue', url=f'{self.host}/api/lims/scheduler/continue',
params={ params={
@@ -708,7 +1026,7 @@ class BioyondV1RPC(BaseRequest):
return response.get("code", 0) return response.get("code", 0)
def scheduler_stop(self) -> int: def scheduler_stop(self) -> int:
"""描述:停止调度器""" """停止调度器"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/scheduler/stop', url=f'{self.host}/api/lims/scheduler/stop',
params={ params={
@@ -721,7 +1039,7 @@ class BioyondV1RPC(BaseRequest):
return response.get("code", 0) return response.get("code", 0)
def scheduler_reset(self) -> int: def scheduler_reset(self) -> int:
"""描述:复位调度器""" """复位调度器"""
response = self.post( response = self.post(
url=f'{self.host}/api/lims/scheduler/reset', url=f'{self.host}/api/lims/scheduler/reset',
params={ params={
@@ -733,6 +1051,26 @@ class BioyondV1RPC(BaseRequest):
return 0 return 0
return response.get("code", 0) return response.get("code", 0)
def scheduler_reply_error_handling(self, data: Dict[str, Any]) -> int:
"""调度错误处理回复
参数:
data: 错误处理参数
返回值:
int: 成功返回1失败返回0
"""
response = self.post(
url=f'{self.host}/api/lims/scheduler/reply-error-handling',
params={
"apiKey": self.api_key,
"requestTime": self.get_current_time_iso8601(),
"data": data,
})
if not response or response['code'] != 1:
return 0
return response.get("code", 0)
# ==================== 辅助方法 ==================== # ==================== 辅助方法 ====================
def _load_material_cache(self): def _load_material_cache(self):
@@ -796,3 +1134,23 @@ class BioyondV1RPC(BaseRequest):
def get_available_materials(self): def get_available_materials(self):
"""获取所有可用的材料名称列表""" """获取所有可用的材料名称列表"""
return list(self.material_cache.keys()) return list(self.material_cache.keys())
def get_scheduler_state(self) -> Optional[MachineState]:
"""将调度状态字符串映射为枚举值
返回值:
Optional[MachineState]: 映射后的枚举,失败返回 None
"""
data = self.scheduler_status()
if not isinstance(data, dict):
return None
status = data.get("schedulerStatus")
mapping = {
"Init": MachineState.INITIAL,
"Stop": MachineState.STOPPED,
"Running": MachineState.RUNNING,
"Pause": MachineState.PAUSED,
"ErrorPause": MachineState.ERROR_PAUSED,
"ErrorStop": MachineState.ERROR_STOPPED,
}
return mapping.get(status)

View File

@@ -1,11 +1,17 @@
from datetime import datetime from datetime import datetime
import json import json
import time import time
from typing import Optional, Dict, Any from typing import Optional, Dict, Any, List
import requests
from unilabos.devices.workstation.bioyond_studio.config import API_CONFIG
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondException from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import BioyondException
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode, BaseROS2DeviceNode
import json
import sys
from pathlib import Path
import importlib
class BioyondDispensingStation(BioyondWorkstation): class BioyondDispensingStation(BioyondWorkstation):
def __init__( def __init__(
@@ -28,6 +34,108 @@ class BioyondDispensingStation(BioyondWorkstation):
# 用于跟踪任务完成状态的字典: {orderCode: {status, order_id, timestamp}} # 用于跟踪任务完成状态的字典: {orderCode: {status, order_id, timestamp}}
self.order_completion_status = {} self.order_completion_status = {}
def _post_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]:
"""项目接口通用POST调用
参数:
endpoint: 接口路径(例如 /api/lims/order/brief-step-paramerers
data: 请求体中的 data 字段内容
返回:
dict: 服务端响应,失败时返回 {code:0,message,...}
"""
request_data = {
"apiKey": API_CONFIG["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
try:
response = requests.post(
f"{self.hardware_interface.host}{endpoint}",
json=request_data,
headers={"Content-Type": "application/json"},
timeout=30
)
result = response.json()
return result if isinstance(result, dict) else {"code": 0, "message": "非JSON响应"}
except json.JSONDecodeError:
return {"code": 0, "message": "非JSON响应"}
except requests.exceptions.Timeout:
return {"code": 0, "message": "请求超时"}
except requests.exceptions.RequestException as e:
return {"code": 0, "message": str(e)}
def _delete_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]:
"""项目接口通用DELETE调用
参数:
endpoint: 接口路径(例如 /api/lims/order/workflows
data: 请求体中的 data 字段内容
返回:
dict: 服务端响应,失败时返回 {code:0,message,...}
"""
request_data = {
"apiKey": API_CONFIG["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
try:
response = requests.delete(
f"{self.hardware_interface.host}{endpoint}",
json=request_data,
headers={"Content-Type": "application/json"},
timeout=30
)
result = response.json()
return result if isinstance(result, dict) else {"code": 0, "message": "非JSON响应"}
except json.JSONDecodeError:
return {"code": 0, "message": "非JSON响应"}
except requests.exceptions.Timeout:
return {"code": 0, "message": "请求超时"}
except requests.exceptions.RequestException as e:
return {"code": 0, "message": str(e)}
def compute_experiment_design(
self,
ratio: dict,
wt_percent: str = "0.25",
m_tot: str = "70",
titration_percent: str = "0.03",
) -> dict:
try:
if isinstance(ratio, str):
try:
ratio = json.loads(ratio)
except Exception:
ratio = {}
root = str(Path(__file__).resolve().parents[3])
if root not in sys.path:
sys.path.append(root)
try:
mod = importlib.import_module("tem.compute")
except Exception as e:
raise BioyondException(f"无法导入计算模块: {e}")
try:
wp = float(wt_percent) if isinstance(wt_percent, str) else wt_percent
mt = float(m_tot) if isinstance(m_tot, str) else m_tot
tp = float(titration_percent) if isinstance(titration_percent, str) else titration_percent
except Exception as e:
raise BioyondException(f"参数解析失败: {e}")
res = mod.generate_experiment_design(ratio=ratio, wt_percent=wp, m_tot=mt, titration_percent=tp)
out = {
"solutions": res.get("solutions", []),
"titration": res.get("titration", {}),
"solvents": res.get("solvents", {}),
"feeding_order": res.get("feeding_order", []),
"return_info": json.dumps(res, ensure_ascii=False)
}
return out
except BioyondException:
raise
except Exception as e:
raise BioyondException(str(e))
# 90%10%小瓶投料任务创建方法 # 90%10%小瓶投料任务创建方法
def create_90_10_vial_feeding_task(self, def create_90_10_vial_feeding_task(self,
order_name: str = None, order_name: str = None,
@@ -649,6 +757,40 @@ class BioyondDispensingStation(BioyondWorkstation):
self.hardware_interface._logger.error(error_msg) self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg) raise BioyondException(error_msg)
def brief_step_parameters(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""获取简要步骤参数(站点项目接口)
参数:
data: 查询参数字典
返回值:
dict: 接口返回数据
"""
return self._post_project_api("/api/lims/order/brief-step-paramerers", data)
def project_order_report(self, order_id: str) -> Dict[str, Any]:
"""查询项目端订单报告(兼容旧路径)
参数:
order_id: 订单ID
返回值:
dict: 报告数据
"""
return self._post_project_api("/api/lims/order/project-order-report", order_id)
def workflow_sample_locations(self, workflow_id: str) -> Dict[str, Any]:
"""查询工作流样品库位(站点项目接口)
参数:
workflow_id: 工作流ID
返回值:
dict: 位置信息数据
"""
return self._post_project_api("/api/lims/storage/workflow-sample-locations", workflow_id)
# 批量创建90%10%小瓶投料任务 # 批量创建90%10%小瓶投料任务
def batch_create_90_10_vial_feeding_tasks(self, def batch_create_90_10_vial_feeding_tasks(self,
titration, titration,
@@ -779,8 +921,38 @@ class BioyondDispensingStation(BioyondWorkstation):
self.hardware_interface._logger.error(error_msg) self.hardware_interface._logger.error(error_msg)
raise BioyondException(error_msg) raise BioyondException(error_msg)
def _extract_actuals_from_report(self, report) -> Dict[str, Any]:
data = report.get('data') if isinstance(report, dict) else None
actual_target_weigh = None
actual_volume = None
if data:
extra = data.get('extraProperties') or {}
if isinstance(extra, dict):
for v in extra.values():
obj = None
try:
obj = json.loads(v) if isinstance(v, str) else v
except Exception:
obj = None
if isinstance(obj, dict):
tw = obj.get('targetWeigh')
vol = obj.get('volume')
if tw is not None:
try:
actual_target_weigh = float(tw)
except Exception:
pass
if vol is not None:
try:
actual_volume = float(vol)
except Exception:
pass
return {
'actualTargetWeigh': actual_target_weigh,
'actualVolume': actual_volume
}
# 等待多个任务完成并获取实验报告
def wait_for_multiple_orders_and_get_reports(self, def wait_for_multiple_orders_and_get_reports(self,
batch_create_result: str = None, batch_create_result: str = None,
timeout: int = 7200, timeout: int = 7200,
@@ -902,6 +1074,7 @@ class BioyondDispensingStation(BioyondWorkstation):
"status": "timeout", "status": "timeout",
"completion_status": None, "completion_status": None,
"report": None, "report": None,
"extracted": None,
"elapsed_time": elapsed_time "elapsed_time": elapsed_time
}) })
@@ -921,8 +1094,7 @@ class BioyondDispensingStation(BioyondWorkstation):
# 获取实验报告 # 获取实验报告
try: try:
report_query = json.dumps({"order_id": order_id}) report = self.project_order_report(order_id)
report = self.hardware_interface.order_report(report_query)
if not report: if not report:
self.hardware_interface._logger.warning( self.hardware_interface._logger.warning(
@@ -940,6 +1112,7 @@ class BioyondDispensingStation(BioyondWorkstation):
"status": "completed", "status": "completed",
"completion_status": completion_info.get('status'), "completion_status": completion_info.get('status'),
"report": report, "report": report,
"extracted": self._extract_actuals_from_report(report),
"elapsed_time": elapsed_time "elapsed_time": elapsed_time
}) })
@@ -959,6 +1132,7 @@ class BioyondDispensingStation(BioyondWorkstation):
"status": "error", "status": "error",
"completion_status": completion_info.get('status'), "completion_status": completion_info.get('status'),
"report": None, "report": None,
"extracted": None,
"error": str(e), "error": str(e),
"elapsed_time": elapsed_time "elapsed_time": elapsed_time
}) })
@@ -1052,6 +1226,266 @@ class BioyondDispensingStation(BioyondWorkstation):
self.hardware_interface._logger.error(f"处理任务完成报送失败: {e}") self.hardware_interface._logger.error(f"处理任务完成报送失败: {e}")
return {"processed": False, "error": str(e)} return {"processed": False, "error": str(e)}
def transfer_materials_to_reaction_station(
self,
target_device_id: str,
transfer_groups: list
) -> dict:
"""
将配液站完成的物料转移到指定反应站的堆栈库位
支持多组转移任务,每组包含物料名称、目标堆栈和目标库位
Args:
target_device_id: 目标反应站设备ID(所有转移组使用同一个设备)
transfer_groups: 转移任务组列表,每组包含:
- materials: 物料名称(字符串,将通过RPC查询)
- target_stack: 目标堆栈名称(如"堆栈1左")
- target_sites: 目标库位(如"A01")
Returns:
dict: 转移结果
{
"success": bool,
"total_groups": int,
"successful_groups": int,
"failed_groups": int,
"target_device_id": str,
"details": [...]
}
"""
try:
# 验证参数
if not target_device_id:
raise ValueError("目标设备ID不能为空")
if not transfer_groups:
raise ValueError("转移任务组列表不能为空")
if not isinstance(transfer_groups, list):
raise ValueError("transfer_groups必须是列表类型")
# 标准化设备ID格式: 确保以 /devices/ 开头
if not target_device_id.startswith("/devices/"):
if target_device_id.startswith("/"):
target_device_id = f"/devices{target_device_id}"
else:
target_device_id = f"/devices/{target_device_id}"
self.hardware_interface._logger.info(
f"目标设备ID标准化为: {target_device_id}"
)
self.hardware_interface._logger.info(
f"开始执行批量物料转移: {len(transfer_groups)}组任务 -> {target_device_id}"
)
from .config import WAREHOUSE_MAPPING
results = []
successful_count = 0
failed_count = 0
for idx, group in enumerate(transfer_groups, 1):
try:
# 提取参数
material_name = group.get("materials", "")
target_stack = group.get("target_stack", "")
target_sites = group.get("target_sites", "")
# 验证必填参数
if not material_name:
raise ValueError(f"{idx}组: 物料名称不能为空")
if not target_stack:
raise ValueError(f"{idx}组: 目标堆栈不能为空")
if not target_sites:
raise ValueError(f"{idx}组: 目标库位不能为空")
self.hardware_interface._logger.info(
f"处理第{idx}组转移: {material_name} -> "
f"{target_device_id}/{target_stack}/{target_sites}"
)
# 通过物料名称从deck获取ResourcePLR对象
try:
material_resource = self.deck.get_resource(material_name)
if not material_resource:
raise ValueError(f"在deck中未找到物料: {material_name}")
self.hardware_interface._logger.info(
f"从deck获取到物料 {material_name}: {material_resource}"
)
except Exception as e:
raise ValueError(
f"获取物料 {material_name} 失败: {str(e)}请确认物料已正确加载到deck中"
)
# 验证目标堆栈是否存在
if target_stack not in WAREHOUSE_MAPPING:
raise ValueError(
f"未知的堆栈名称: {target_stack}"
f"可选值: {list(WAREHOUSE_MAPPING.keys())}"
)
# 验证库位是否有效
stack_sites = WAREHOUSE_MAPPING[target_stack].get("site_uuids", {})
if target_sites not in stack_sites:
raise ValueError(
f"库位 {target_sites} 不存在于堆栈 {target_stack} 中,"
f"可选库位: {list(stack_sites.keys())}"
)
# 获取目标库位的UUID
target_site_uuid = stack_sites[target_sites]
if not target_site_uuid:
raise ValueError(
f"库位 {target_sites} 的 UUID 未配置,请在 WAREHOUSE_MAPPING 中完善"
)
# 目标位点包含UUID
future = ROS2DeviceNode.run_async_func(
self._ros_node.get_resource_with_dir,
True,
**{
"resource_id": f"/reaction_station_bioyond/Bioyond_Deck/{target_stack}",
"with_children": True,
},
)
# 等待异步完成后再获取结果
if not future:
raise ValueError(f"获取目标堆栈资源future无效: {target_stack}")
while not future.done():
time.sleep(0.1)
target_site_resource = future.result()
# 调用父类的 transfer_resource_to_another 方法
# 传入ResourcePLR对象和目标位点资源
future = self.transfer_resource_to_another(
resource=[material_resource],
mount_resource=[target_site_resource],
sites=[target_sites],
mount_device_id=target_device_id
)
# 等待异步任务完成(轮询直到完成,再取结果)
if future:
try:
while not future.done():
time.sleep(0.1)
future.result()
self.hardware_interface._logger.info(
f"异步转移任务已完成: {material_name}"
)
except Exception as e:
raise ValueError(f"转移任务执行失败: {str(e)}")
self.hardware_interface._logger.info(
f"{idx}组转移成功: {material_name} -> "
f"{target_device_id}/{target_stack}/{target_sites}"
)
successful_count += 1
results.append({
"group_index": idx,
"success": True,
"material_name": material_name,
"target_stack": target_stack,
"target_site": target_sites,
"message": "转移成功"
})
except Exception as e:
error_msg = f"{idx}组转移失败: {str(e)}"
self.hardware_interface._logger.error(error_msg)
failed_count += 1
results.append({
"group_index": idx,
"success": False,
"material_name": group.get("materials", ""),
"error": str(e)
})
# 返回汇总结果
return {
"success": failed_count == 0,
"total_groups": len(transfer_groups),
"successful_groups": successful_count,
"failed_groups": failed_count,
"target_device_id": target_device_id,
"details": results,
"message": f"完成 {len(transfer_groups)} 组转移任务到 {target_device_id}: "
f"{successful_count} 成功, {failed_count} 失败"
}
except Exception as e:
error_msg = f"批量转移物料失败: {str(e)}"
self.hardware_interface._logger.error(error_msg)
return {
"success": False,
"total_groups": len(transfer_groups) if transfer_groups else 0,
"successful_groups": 0,
"failed_groups": len(transfer_groups) if transfer_groups else 0,
"target_device_id": target_device_id if target_device_id else "",
"error": error_msg
}
def query_resource_by_name(self, material_name: str):
"""
通过物料名称查询资源对象(适用于Bioyond系统)
Args:
material_name: 物料名称
Returns:
物料ID或None
"""
try:
# Bioyond系统使用material_cache存储物料信息
if not hasattr(self.hardware_interface, 'material_cache'):
self.hardware_interface._logger.error(
"hardware_interface没有material_cache属性"
)
return None
material_cache = self.hardware_interface.material_cache
self.hardware_interface._logger.info(
f"查询物料 '{material_name}', 缓存中共有 {len(material_cache)} 个物料"
)
# 调试: 打印前几个物料信息
if material_cache:
cache_items = list(material_cache.items())[:5]
for name, material_id in cache_items:
self.hardware_interface._logger.debug(
f"缓存物料: name={name}, id={material_id}"
)
# 直接从缓存中查找
if material_name in material_cache:
material_id = material_cache[material_name]
self.hardware_interface._logger.info(
f"找到物料: {material_name} -> ID: {material_id}"
)
return material_id
self.hardware_interface._logger.warning(
f"未找到物料: {material_name} (缓存中无此物料)"
)
# 打印所有可用物料名称供参考
available_materials = list(material_cache.keys())
if available_materials:
self.hardware_interface._logger.info(
f"可用物料列表(前10个): {available_materials[:10]}"
)
return None
except Exception as e:
self.hardware_interface._logger.error(
f"查询物料失败 {material_name}: {str(e)}"
)
return None
if __name__ == "__main__": if __name__ == "__main__":
bioyond = BioyondDispensingStation(config={ bioyond = BioyondDispensingStation(config={

View File

@@ -5,6 +5,7 @@ from typing import List, Dict, Any
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation from unilabos.devices.workstation.bioyond_studio.station import BioyondWorkstation
from unilabos.devices.workstation.bioyond_studio.bioyond_rpc import MachineState
from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String from unilabos.ros.msgs.message_converter import convert_to_ros_msg, Float64, String
from unilabos.devices.workstation.bioyond_studio.config import ( from unilabos.devices.workstation.bioyond_studio.config import (
WORKFLOW_STEP_IDS, WORKFLOW_STEP_IDS,
@@ -717,8 +718,7 @@ class BioyondReactionStation(BioyondWorkstation):
if oc in self.order_completion_status: if oc in self.order_completion_status:
info = self.order_completion_status[oc] info = self.order_completion_status[oc]
try: try:
rq = json.dumps({"order_id": oid}) rep = self.hardware_interface.order_report(oid)
rep = self.hardware_interface.order_report(rq)
if not rep: if not rep:
rep = {"error": "无法获取报告"} rep = {"error": "无法获取报告"}
reports.append({ reports.append({
@@ -912,6 +912,106 @@ class BioyondReactionStation(BioyondWorkstation):
""" """
return self.hardware_interface.create_order(json_str) return self.hardware_interface.create_order(json_str)
def hard_delete_merged_workflows(self, workflow_ids: List[str]) -> Dict[str, Any]:
"""
调用新接口:硬删除合并后的工作流
Args:
workflow_ids: 要删除的工作流ID数组
Returns:
删除结果
"""
try:
if not isinstance(workflow_ids, list):
raise ValueError("workflow_ids必须是字符串数组")
return self._delete_project_api("/api/lims/order/workflows", workflow_ids)
except Exception as e:
print(f"❌ 硬删除异常: {str(e)}")
return {"code": 0, "message": str(e), "timestamp": int(time.time())}
# ==================== 项目接口通用方法 ====================
def _post_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]:
"""项目接口通用POST调用
参数:
endpoint: 接口路径(例如 /api/lims/order/skip-titration-steps
data: 请求体中的 data 字段内容
返回:
dict: 服务端响应,失败时返回 {code:0,message,...}
"""
request_data = {
"apiKey": API_CONFIG["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
print(f"\n📤 项目POST请求: {self.hardware_interface.host}{endpoint}")
print(json.dumps(request_data, indent=4, ensure_ascii=False))
try:
response = requests.post(
f"{self.hardware_interface.host}{endpoint}",
json=request_data,
headers={"Content-Type": "application/json"},
timeout=30
)
result = response.json()
if result.get("code") == 1:
print("✅ 请求成功")
else:
print(f"❌ 请求失败: {result.get('message','未知错误')}")
return result
except json.JSONDecodeError:
print("❌ 非JSON响应")
return {"code": 0, "message": "非JSON响应", "timestamp": int(time.time())}
except requests.exceptions.Timeout:
print("❌ 请求超时")
return {"code": 0, "message": "请求超时", "timestamp": int(time.time())}
except requests.exceptions.RequestException as e:
print(f"❌ 网络异常: {str(e)}")
return {"code": 0, "message": str(e), "timestamp": int(time.time())}
def _delete_project_api(self, endpoint: str, data: Any) -> Dict[str, Any]:
"""项目接口通用DELETE调用
参数:
endpoint: 接口路径(例如 /api/lims/order/workflows
data: 请求体中的 data 字段内容
返回:
dict: 服务端响应,失败时返回 {code:0,message,...}
"""
request_data = {
"apiKey": API_CONFIG["api_key"],
"requestTime": self.hardware_interface.get_current_time_iso8601(),
"data": data
}
print(f"\n📤 项目DELETE请求: {self.hardware_interface.host}{endpoint}")
print(json.dumps(request_data, indent=4, ensure_ascii=False))
try:
response = requests.delete(
f"{self.hardware_interface.host}{endpoint}",
json=request_data,
headers={"Content-Type": "application/json"},
timeout=30
)
result = response.json()
if result.get("code") == 1:
print("✅ 请求成功")
else:
print(f"❌ 请求失败: {result.get('message','未知错误')}")
return result
except json.JSONDecodeError:
print("❌ 非JSON响应")
return {"code": 0, "message": "非JSON响应", "timestamp": int(time.time())}
except requests.exceptions.Timeout:
print("❌ 请求超时")
return {"code": 0, "message": "请求超时", "timestamp": int(time.time())}
except requests.exceptions.RequestException as e:
print(f"❌ 网络异常: {str(e)}")
return {"code": 0, "message": str(e), "timestamp": int(time.time())}
# ==================== 工作流执行核心方法 ==================== # ==================== 工作流执行核心方法 ====================
def process_web_workflows(self, web_workflow_json: str) -> List[Dict[str, str]]: def process_web_workflows(self, web_workflow_json: str) -> List[Dict[str, str]]:
@@ -942,76 +1042,6 @@ class BioyondReactionStation(BioyondWorkstation):
print(f"错误:处理工作流失败: {e}") print(f"错误:处理工作流失败: {e}")
return [] return []
def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict:
"""
一站式处理工作流程:解析网页工作流列表,合并工作流(带参数),然后发布任务
Args:
workflow_name: 合并后的工作流名称
task_name: 任务名称
Returns:
任务创建结果
"""
web_workflow_list = self.get_workflow_sequence()
print(f"\n{'='*60}")
print(f"📋 处理网页工作流列表: {web_workflow_list}")
print(f"{'='*60}")
web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list})
workflows_result = self.process_web_workflows(web_workflow_json)
if not workflows_result:
return self._create_error_result("处理网页工作流列表失败", "process_web_workflows")
print(f"workflows_result 类型: {type(workflows_result)}")
print(f"workflows_result 内容: {workflows_result}")
workflows_with_params = self._build_workflows_with_parameters(workflows_result)
merge_data = {
"name": workflow_name,
"workflows": workflows_with_params
}
# print(f"\n🔄 合并工作流(带参数),名称: {workflow_name}")
merged_workflow = self.merge_workflow_with_parameters(json.dumps(merge_data))
if not merged_workflow:
return self._create_error_result("合并工作流失败", "merge_workflow_with_parameters")
workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "")
# print(f"\n📤 使用工作流创建任务: {workflow_name} (ID: {workflow_id})")
order_params = [{
"orderCode": f"task_{self.hardware_interface.get_current_time_iso8601()}",
"orderName": task_name,
"workFlowId": workflow_id,
"borderNumber": 1,
"paramValues": {}
}]
result = self.create_order(json.dumps(order_params))
if not result:
return self._create_error_result("创建任务失败", "create_order")
# 清空工作流序列和参数,防止下次执行时累积重复
self.pending_task_params = []
self.clear_workflows() # 清空工作流序列,避免重复累积
# print(f"\n✅ 任务创建成功: {result}")
# print(f"\n✅ 任务创建成功")
print(f"{'='*60}\n")
# 返回结果,包含合并后的工作流数据和订单参数
return json.dumps({
"success": True,
"result": result,
"merged_workflow": merged_workflow,
"order_params": order_params
})
def _build_workflows_with_parameters(self, workflows_result: list) -> list: def _build_workflows_with_parameters(self, workflows_result: list) -> list:
""" """
构建带参数的工作流列表 构建带参数的工作流列表
@@ -1211,3 +1241,90 @@ class BioyondReactionStation(BioyondWorkstation):
print(f" ❌ 工作流ID验证失败: {e}") print(f" ❌ 工作流ID验证失败: {e}")
print(f" 💡 将重新合并工作流") print(f" 💡 将重新合并工作流")
return False return False
def process_and_execute_workflow(self, workflow_name: str, task_name: str) -> dict:
"""
一站式处理工作流程:解析网页工作流列表,合并工作流(带参数),然后发布任务
Args:
workflow_name: 合并后的工作流名称
task_name: 任务名称
Returns:
任务创建结果
"""
web_workflow_list = self.get_workflow_sequence()
print(f"\n{'='*60}")
print(f"📋 处理网页工作流列表: {web_workflow_list}")
print(f"{'='*60}")
web_workflow_json = json.dumps({"web_workflow_list": web_workflow_list})
workflows_result = self.process_web_workflows(web_workflow_json)
if not workflows_result:
return self._create_error_result("处理网页工作流列表失败", "process_web_workflows")
print(f"workflows_result 类型: {type(workflows_result)}")
print(f"workflows_result 内容: {workflows_result}")
workflows_with_params = self._build_workflows_with_parameters(workflows_result)
merge_data = {
"name": workflow_name,
"workflows": workflows_with_params
}
# print(f"\n🔄 合并工作流(带参数),名称: {workflow_name}")
merged_workflow = self.merge_workflow_with_parameters(json.dumps(merge_data))
if not merged_workflow:
return self._create_error_result("合并工作流失败", "merge_workflow_with_parameters")
workflow_id = merged_workflow.get("subWorkflows", [{}])[0].get("id", "")
# print(f"\n📤 使用工作流创建任务: {workflow_name} (ID: {workflow_id})")
order_params = [{
"orderCode": f"task_{self.hardware_interface.get_current_time_iso8601()}",
"orderName": task_name,
"workFlowId": workflow_id,
"borderNumber": 1,
"paramValues": {}
}]
result = self.create_order(json.dumps(order_params))
if not result:
return self._create_error_result("创建任务失败", "create_order")
# 清空工作流序列和参数,防止下次执行时累积重复
self.pending_task_params = []
self.clear_workflows() # 清空工作流序列,避免重复累积
# print(f"\n✅ 任务创建成功: {result}")
# print(f"\n✅ 任务创建成功")
print(f"{'='*60}\n")
# 返回结果,包含合并后的工作流数据和订单参数
return json.dumps({
"success": True,
"result": result,
"merged_workflow": merged_workflow,
"order_params": order_params
})
# ==================== 反应器操作接口 ====================
def skip_titration_steps(self, preintake_id: str) -> Dict[str, Any]:
"""跳过当前正在进行的滴定步骤
Args:
preintake_id: 通量ID
Returns:
Dict[str, Any]: 服务器响应,包含状态码、消息和时间戳
"""
try:
return self._post_project_api("/api/lims/order/skip-titration-steps", preintake_id)
except Exception as e:
print(f"❌ 跳过滴定异常: {str(e)}")
return {"code": 0, "message": str(e), "timestamp": int(time.time())}

View File

@@ -15,8 +15,9 @@ from unilabos.resources.bioyond.warehouses import (
bioyond_warehouse_5x1x1, bioyond_warehouse_5x1x1,
bioyond_warehouse_1x8x4, bioyond_warehouse_1x8x4,
bioyond_warehouse_reagent_storage, bioyond_warehouse_reagent_storage,
bioyond_warehouse_liquid_preparation, # bioyond_warehouse_liquid_preparation,
bioyond_warehouse_tipbox_storage, # 新增Tip盒堆栈 bioyond_warehouse_tipbox_storage, # 新增Tip盒堆栈
bioyond_warehouse_density_vial,
) )
@@ -43,17 +44,20 @@ class BIOYOND_PolymerReactionStation_Deck(Deck):
"堆栈1左": bioyond_warehouse_1x4x4("堆栈1左"), # 左侧堆栈: A01D04 "堆栈1左": bioyond_warehouse_1x4x4("堆栈1左"), # 左侧堆栈: A01D04
"堆栈1右": bioyond_warehouse_1x4x4_right("堆栈1右"), # 右侧堆栈: A05D08 "堆栈1右": bioyond_warehouse_1x4x4_right("堆栈1右"), # 右侧堆栈: A05D08
"站内试剂存放堆栈": bioyond_warehouse_reagent_storage("站内试剂存放堆栈"), # A01A02 "站内试剂存放堆栈": bioyond_warehouse_reagent_storage("站内试剂存放堆栈"), # A01A02
"移液站内10%分装液体准备仓库": bioyond_warehouse_liquid_preparation("移液站内10%分装液体准备仓库"), # A01B04 # "移液站内10%分装液体准备仓库": bioyond_warehouse_liquid_preparation("移液站内10%分装液体准备仓库"), # A01B04
"站内Tip盒堆栈": bioyond_warehouse_tipbox_storage("站内Tip盒堆栈"), # A01B03, 存放枪头盒 "站内Tip盒堆栈": bioyond_warehouse_tipbox_storage("站内Tip盒堆栈"), # A01B03, 存放枪头盒.
"测量小瓶仓库(测密度)": bioyond_warehouse_density_vial("测量小瓶仓库(测密度)"), # A01B03
} }
self.warehouse_locations = { self.warehouse_locations = {
"堆栈1左": Coordinate(0.0, 430.0, 0.0), # 左侧位置 "堆栈1左": Coordinate(0.0, 430.0, 0.0), # 左侧位置
"堆栈1右": Coordinate(2500.0, 430.0, 0.0), # 右侧位置 "堆栈1右": Coordinate(2500.0, 430.0, 0.0), # 右侧位置
"站内试剂存放堆栈": Coordinate(640.0, 480.0, 0.0), "站内试剂存放堆栈": Coordinate(640.0, 480.0, 0.0),
"移液站内10%分装液体准备仓库": Coordinate(1200.0, 600.0, 0.0), # "移液站内10%分装液体准备仓库": Coordinate(1200.0, 600.0, 0.0),
"站内Tip盒堆栈": Coordinate(300.0, 150.0, 0.0), "站内Tip盒堆栈": Coordinate(300.0, 150.0, 0.0),
"测量小瓶仓库(测密度)": Coordinate(922.0, 552.0, 0.0),
} }
self.warehouses["站内试剂存放堆栈"].rotation = Rotation(z=90) self.warehouses["站内试剂存放堆栈"].rotation = Rotation(z=90)
self.warehouses["测量小瓶仓库(测密度)"].rotation = Rotation(z=270)
for warehouse_name, warehouse in self.warehouses.items(): for warehouse_name, warehouse in self.warehouses.items():
self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name]) self.assign_child_resource(warehouse, location=self.warehouse_locations[warehouse_name])

View File

@@ -1,5 +1,6 @@
from unilabos.resources.warehouse import WareHouse, warehouse_factory from unilabos.resources.warehouse import WareHouse, warehouse_factory
# ================ 反应站相关堆栈 ================
def bioyond_warehouse_1x4x4(name: str) -> WareHouse: def bioyond_warehouse_1x4x4(name: str) -> WareHouse:
"""创建BioYond 4x4x1仓库 (左侧堆栈: A01D04) """创建BioYond 4x4x1仓库 (左侧堆栈: A01D04)
@@ -26,7 +27,6 @@ def bioyond_warehouse_1x4x4(name: str) -> WareHouse:
layout="row-major", # ⭐ 改为行优先排序 layout="row-major", # ⭐ 改为行优先排序
) )
def bioyond_warehouse_1x4x4_right(name: str) -> WareHouse: def bioyond_warehouse_1x4x4_right(name: str) -> WareHouse:
"""创建BioYond 4x4x1仓库 (右侧堆栈: A05D08)""" """创建BioYond 4x4x1仓库 (右侧堆栈: A05D08)"""
return warehouse_factory( return warehouse_factory(
@@ -45,15 +45,35 @@ def bioyond_warehouse_1x4x4_right(name: str) -> WareHouse:
layout="row-major", # ⭐ 改为行优先排序 layout="row-major", # ⭐ 改为行优先排序
) )
def bioyond_warehouse_density_vial(name: str) -> WareHouse:
"""创建测量小瓶仓库(测密度) A01B03"""
def bioyond_warehouse_1x4x2(name: str) -> WareHouse:
"""创建BioYond 4x1x2仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=1, num_items_x=3, # 3列01-03
num_items_y=4, num_items_y=2, # 2行A-B
num_items_z=2, num_items_z=1, # 1层
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=40.0,
item_dy=40.0,
item_dz=50.0,
# 用更小的 resource_size 来表现 "小点的孔位"
resource_size_x=30.0,
resource_size_y=30.0,
resource_size_z=12.0,
category="warehouse",
col_offset=0,
layout="row-major",
)
def bioyond_warehouse_reagent_storage(name: str) -> WareHouse:
"""创建BioYond站内试剂存放堆栈A01A02, 1行×2列"""
return warehouse_factory(
name=name,
num_items_x=2, # 2列01-02
num_items_y=1, # 1行A
num_items_z=1, # 1层
dx=10.0, dx=10.0,
dy=10.0, dy=10.0,
dz=10.0, dz=10.0,
@@ -61,9 +81,46 @@ def bioyond_warehouse_1x4x2(name: str) -> WareHouse:
item_dy=96.0, item_dy=96.0,
item_dz=120.0, item_dz=120.0,
category="warehouse", category="warehouse",
removed_positions=None
) )
def bioyond_warehouse_tipbox_storage(name: str) -> WareHouse:
"""创建BioYond站内Tip盒堆栈A01B03用于存放枪头盒"""
return warehouse_factory(
name=name,
num_items_x=3, # 3列01-03
num_items_y=2, # 2行A-B
num_items_z=1, # 1层
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=137.0,
item_dy=96.0,
item_dz=120.0,
category="warehouse",
col_offset=0,
layout="row-major",
)
def bioyond_warehouse_liquid_preparation(name: str) -> WareHouse:
"""已弃用,创建BioYond移液站内10%分装液体准备仓库A01B04"""
return warehouse_factory(
name=name,
num_items_x=4, # 4列01-04
num_items_y=2, # 2行A-B
num_items_z=1, # 1层
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=137.0,
item_dy=96.0,
item_dz=120.0,
category="warehouse",
col_offset=0,
layout="row-major",
)
# ================ 配液站相关堆栈 ================
def bioyond_warehouse_reagent_stack(name: str) -> WareHouse: def bioyond_warehouse_reagent_stack(name: str) -> WareHouse:
"""创建BioYond 试剂堆栈 2x4x1 (2行×4列: A01-A04, B01-B04) """创建BioYond 试剂堆栈 2x4x1 (2行×4列: A01-A04, B01-B04)
@@ -88,8 +145,28 @@ def bioyond_warehouse_reagent_stack(name: str) -> WareHouse:
) )
# 定义bioyond的堆栈 # 定义bioyond的堆栈
# =================== Other ===================
def bioyond_warehouse_1x4x2(name: str) -> WareHouse:
"""创建BioYond 4x2x1仓库"""
return warehouse_factory(
name=name,
num_items_x=1,
num_items_y=4,
num_items_z=2,
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=137.0,
item_dy=96.0,
item_dz=120.0,
category="warehouse",
removed_positions=None
)
def bioyond_warehouse_1x2x2(name: str) -> WareHouse: def bioyond_warehouse_1x2x2(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """创建BioYond 1x2x2仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=1, num_items_x=1,
@@ -103,8 +180,9 @@ def bioyond_warehouse_1x2x2(name: str) -> WareHouse:
item_dz=120.0, item_dz=120.0,
category="warehouse", category="warehouse",
) )
def bioyond_warehouse_10x1x1(name: str) -> WareHouse: def bioyond_warehouse_10x1x1(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """创建BioYond 10x1x1仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=10, num_items_x=10,
@@ -118,8 +196,9 @@ def bioyond_warehouse_10x1x1(name: str) -> WareHouse:
item_dz=120.0, item_dz=120.0,
category="warehouse", category="warehouse",
) )
def bioyond_warehouse_1x3x3(name: str) -> WareHouse: def bioyond_warehouse_1x3x3(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """创建BioYond 1x3x3仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=1, num_items_x=1,
@@ -133,8 +212,9 @@ def bioyond_warehouse_1x3x3(name: str) -> WareHouse:
item_dz=120.0, item_dz=120.0,
category="warehouse", category="warehouse",
) )
def bioyond_warehouse_2x1x3(name: str) -> WareHouse: def bioyond_warehouse_2x1x3(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """创建BioYond 2x1x3仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=2, num_items_x=2,
@@ -150,7 +230,7 @@ def bioyond_warehouse_2x1x3(name: str) -> WareHouse:
) )
def bioyond_warehouse_3x3x1(name: str) -> WareHouse: def bioyond_warehouse_3x3x1(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """创建BioYond 3x3x1仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=3, num_items_x=3,
@@ -164,8 +244,9 @@ def bioyond_warehouse_3x3x1(name: str) -> WareHouse:
item_dz=120.0, item_dz=120.0,
category="warehouse", category="warehouse",
) )
def bioyond_warehouse_5x1x1(name: str) -> WareHouse: def bioyond_warehouse_5x1x1(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """已弃用:创建BioYond 5x1x1仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=5, num_items_x=5,
@@ -179,8 +260,9 @@ def bioyond_warehouse_5x1x1(name: str) -> WareHouse:
item_dz=120.0, item_dz=120.0,
category="warehouse", category="warehouse",
) )
def bioyond_warehouse_3x3x1_2(name: str) -> WareHouse: def bioyond_warehouse_3x3x1_2(name: str) -> WareHouse:
"""创建BioYond 4x1x4仓库""" """已弃用:创建BioYond 3x3x1仓库"""
return warehouse_factory( return warehouse_factory(
name=name, name=name,
num_items_x=3, num_items_x=3,
@@ -212,7 +294,6 @@ def bioyond_warehouse_liquid_and_lid_handling(name: str) -> WareHouse:
removed_positions=None removed_positions=None
) )
def bioyond_warehouse_1x8x4(name: str) -> WareHouse: def bioyond_warehouse_1x8x4(name: str) -> WareHouse:
"""创建BioYond 8x4x1反应站堆栈A01D08""" """创建BioYond 8x4x1反应站堆栈A01D08"""
return warehouse_factory( return warehouse_factory(
@@ -228,58 +309,3 @@ def bioyond_warehouse_1x8x4(name: str) -> WareHouse:
item_dz=130.0, item_dz=130.0,
category="warehouse", category="warehouse",
) )
def bioyond_warehouse_reagent_storage(name: str) -> WareHouse:
"""创建BioYond站内试剂存放堆栈A01A02, 1行×2列"""
return warehouse_factory(
name=name,
num_items_x=2, # 2列01-02
num_items_y=1, # 1行A
num_items_z=1, # 1层
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=137.0,
item_dy=96.0,
item_dz=120.0,
category="warehouse",
)
def bioyond_warehouse_liquid_preparation(name: str) -> WareHouse:
"""创建BioYond移液站内10%分装液体准备仓库A01B04"""
return warehouse_factory(
name=name,
num_items_x=4, # 4列01-04
num_items_y=2, # 2行A-B
num_items_z=1, # 1层
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=137.0,
item_dy=96.0,
item_dz=120.0,
category="warehouse",
col_offset=0,
layout="row-major",
)
def bioyond_warehouse_tipbox_storage(name: str) -> WareHouse:
"""创建BioYond站内Tip盒堆栈A01B03用于存放枪头盒"""
return warehouse_factory(
name=name,
num_items_x=3, # 3列01-03
num_items_y=2, # 2行A-B
num_items_z=1, # 1层
dx=10.0,
dy=10.0,
dz=10.0,
item_dx=137.0,
item_dy=96.0,
item_dz=120.0,
category="warehouse",
col_offset=0,
layout="row-major",
)

View File

@@ -19,6 +19,9 @@ def warehouse_factory(
item_dx: float = 10.0, item_dx: float = 10.0,
item_dy: float = 10.0, item_dy: float = 10.0,
item_dz: float = 10.0, item_dz: float = 10.0,
resource_size_x: float = 127.0,
resource_size_y: float = 86.0,
resource_size_z: float = 25.0,
removed_positions: Optional[List[int]] = None, removed_positions: Optional[List[int]] = None,
empty: bool = False, empty: bool = False,
category: str = "warehouse", category: str = "warehouse",
@@ -50,8 +53,9 @@ def warehouse_factory(
_sites = create_homogeneous_resources( _sites = create_homogeneous_resources(
klass=ResourceHolder, klass=ResourceHolder,
locations=locations, locations=locations,
resource_size_x=127.0, resource_size_x=resource_size_x,
resource_size_y=86.0, resource_size_y=resource_size_y,
resource_size_z=resource_size_z,
name_prefix=name, name_prefix=name,
) )
len_x, len_y = (num_items_x, num_items_y) if num_items_z == 1 else (num_items_y, num_items_z) if num_items_x == 1 else (num_items_x, num_items_z) len_x, len_y = (num_items_x, num_items_y) if num_items_z == 1 else (num_items_y, num_items_z) if num_items_x == 1 else (num_items_x, num_items_z)

View File

@@ -752,6 +752,12 @@ class HostNode(BaseROS2DeviceNode):
if return_info_str is not None: if return_info_str is not None:
try: try:
return_info = json.loads(return_info_str) return_info = json.loads(return_info_str)
# 适配后端的一些额外处理
return_value = return_info.get("return_value")
if isinstance(return_value, dict):
unilabos_samples = return_info.get("unilabos_samples")
if isinstance(unilabos_samples, list):
return_info["unilabos_samples"] = unilabos_samples
suc = return_info.get("suc", False) suc = return_info.get("suc", False)
if not suc: if not suc:
status = "failed" status = "failed"