Update oss config

This commit is contained in:
Xuwznln
2025-11-18 20:17:53 +08:00
parent acf5fdebf8
commit d39662f65f
8 changed files with 409 additions and 401 deletions

View File

@@ -67,14 +67,6 @@ class WSConfig:
max_reconnect_attempts = 999 # 最大重连次数
ping_interval = 30 # ping间隔
# OSS上传配置
class OSSUploadConfig:
api_host = "" # API主机地址
authorization = "" # 授权信息
init_endpoint = "" # 初始化端点
complete_endpoint = "" # 完成端点
max_retries = 3 # 最大重试次数
# HTTP配置
class HTTPConfig:
remote_addr = "https://uni-lab.bohrium.com/api/v1" # 远程服务器地址
@@ -294,19 +286,7 @@ HTTP 客户端配置用于与云端服务通信:
- UAT 环境:`https://uni-lab.uat.bohrium.com/api/v1`
- 本地环境:`http://127.0.0.1:48197/api/v1`
### 4. OSSUploadConfig - OSS 上传配置
对象存储服务配置,用于文件上传功能:
| 参数 | 类型 | 默认值 | 说明 |
| ------------------- | ---- | ------ | -------------------- |
| `api_host` | str | `""` | OSS API 主机地址 |
| `authorization` | str | `""` | 授权认证信息 |
| `init_endpoint` | str | `""` | 上传初始化端点 |
| `complete_endpoint` | str | `""` | 上传完成端点 |
| `max_retries` | int | `3` | 上传失败最大重试次数 |
### 5. ROSConfig - ROS 配置
### 4. ROSConfig - ROS 配置
配置 ROS 消息转换器需要加载的模块:

View File

@@ -4,7 +4,8 @@
## 概述
注册表Registry是Uni-Lab的设备配置系统采用YAML格式定义设备的
注册表Registry Uni-Lab 的设备配置系统,采用 YAML 格式定义设备的:
- 可用动作Actions
- 状态类型Status Types
- 初始化参数Init Parameters
@@ -32,19 +33,19 @@
### 核心字段说明
| 字段名 | 类型 | 需要手写 | 说明 |
| ----------------- | ------ | -------- | ----------------------------------- |
| 设备标识符 | string | 是 | 设备的唯一名字,如 `mock_chiller` |
| class | object | 部分 | 设备的核心信息,必须配置 |
| description | string | 否 | 设备描述,系统默认给空字符串 |
| handles | array | 否 | 连接关系,默认为空 |
| icon | string | 否 | 图标路径,默认为空 |
| init_param_schema | object | 否 | 初始化参数,系统自动分析生成 |
| version | string | 否 | 版本号,默认 "1.0.0" |
| category | array | 否 | 设备分类,默认使用文件名 |
| config_info | array | 否 | 嵌套配置,默认为空 |
| file_path | string | 否 | 文件路径,系统自动设置 |
| registry_type | string | 否 | 注册表类型,自动设为 "device" |
| 字段名 | 类型 | 需要手写 | 说明 |
| ----------------- | ------ | -------- | --------------------------------- |
| 设备标识符 | string | 是 | 设备的唯一名字,如 `mock_chiller` |
| class | object | 部分 | 设备的核心信息,必须配置 |
| description | string | 否 | 设备描述,系统默认给空字符串 |
| handles | array | 否 | 连接关系,默认为空 |
| icon | string | 否 | 图标路径,默认为空 |
| init_param_schema | object | 否 | 初始化参数,系统自动分析生成 |
| version | string | 否 | 版本号,默认 "1.0.0" |
| category | array | 否 | 设备分类,默认使用文件名 |
| config_info | array | 否 | 嵌套配置,默认为空 |
| file_path | string | 否 | 文件路径,系统自动设置 |
| registry_type | string | 否 | 注册表类型,自动设为 "device" |
### class 字段详解
@@ -71,11 +72,11 @@ my_device:
# 动作配置(详见后文)
action_name:
type: UniLabJsonCommand
goal: {...}
result: {...}
goal: { ... }
result: { ... }
description: "设备描述"
version: "1.0.0"
description: '设备描述'
version: '1.0.0'
category:
- device_category
handles: []
@@ -101,21 +102,22 @@ my_device:
## 创建注册表的方式
### 方式1: 使用注册表编辑器(推荐)
### 方式 1: 使用注册表编辑器(推荐)
适合大多数场景,快速高效。
**步骤**
1. 启动Uni-Lab
2. 访问Web界面的"注册表编辑器"
3. 上传您的Python设备驱动文件
1. 启动 Uni-Lab
2. 访问 Web 界面的"注册表编辑器"
3. 上传您的 Python 设备驱动文件
4. 点击"分析文件"
5. 填写描述和图标
6. 点击"生成注册表"
7. 复制生成的YAML内容
7. 复制生成的 YAML 内容
8. 保存到 `unilabos/registry/devices/your_device.yaml`
### 方式2: 使用--complete_registry参数开发调试
### 方式 2: 使用--complete_registry 参数(开发调试)
适合开发阶段,自动补全配置。
@@ -125,7 +127,8 @@ unilab -g dev.json --complete_registry --registry_path ./my_registry
```
系统会:
1. 扫描Python类
1. 扫描 Python 类
2. 分析方法签名和类型
3. 自动生成缺失的字段
4. 保存到注册表文件
@@ -137,7 +140,7 @@ unilab -g dev.json --complete_registry --registry_path ./my_registry
启动系统时用 complete_registry=True 参数让系统自动补全
```
### 方式3: 手动编写(高级)
### 方式 3: 手动编写(高级)
适合需要精细控制或特殊需求的场景。
@@ -186,6 +189,7 @@ my_device:
| ROS 动作类型 | 标准 ROS 动作 | goal_default 和 schema |
**常用的 ROS 动作类型**
- `SendCmd`:发送简单命令
- `NavigateThroughPoses`:导航动作
- `SingleJointPosition`:单关节位置控制
@@ -251,11 +255,11 @@ heat_chill_start:
## 特殊类型的自动识别
### ResourceSlotDeviceSlot识别
### ResourceSlotDeviceSlot 识别
当您在驱动代码中使用这些特殊类型时,系统会自动识别并生成相应的前端选择器。
**Python驱动代码示例**
**Python 驱动代码示例**
```python
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
@@ -286,24 +290,24 @@ my_device:
device: device
devices: devices
placeholder_keys:
resource: unilabos_resources # 自动添加!
resources: unilabos_resources # 自动添加!
device: unilabos_devices # 自动添加!
devices: unilabos_devices # 自动添加!
resource: unilabos_resources # 自动添加!
resources: unilabos_resources # 自动添加!
device: unilabos_devices # 自动添加!
devices: unilabos_devices # 自动添加!
result:
success: success
```
### 识别规则
| Python类型 | placeholder_keys | 前端效果 |
|-----------|-------------------|---------|
| `ResourceSlot` | `unilabos_resources` | 单选资源下拉框 |
| Python 类型 | placeholder_keys | 前端效果 |
| -------------------- | -------------------- | -------------- |
| `ResourceSlot` | `unilabos_resources` | 单选资源下拉框 |
| `List[ResourceSlot]` | `unilabos_resources` | 多选资源下拉框 |
| `DeviceSlot` | `unilabos_devices` | 单选设备下拉框 |
| `List[DeviceSlot]` | `unilabos_devices` | 多选设备下拉框 |
| `DeviceSlot` | `unilabos_devices` | 单选设备下拉框 |
| `List[DeviceSlot]` | `unilabos_devices` | 多选设备下拉框 |
### 前端UI效果
### 前端 UI 效果
#### 单选资源
@@ -313,6 +317,7 @@ placeholder_keys:
```
**前端渲染**:
```
Source: [下拉选择框 ▼]
├── plate_1 (96孔板)
@@ -329,6 +334,7 @@ placeholder_keys:
```
**前端渲染**:
```
Targets: [多选下拉框 ▼]
☑ plate_1 (96孔板)
@@ -345,6 +351,7 @@ placeholder_keys:
```
**前端渲染**:
```
Pump: [下拉选择框 ▼]
├── pump_1 (注射泵A)
@@ -360,6 +367,7 @@ placeholder_keys:
```
**前端渲染**:
```
Sync Devices: [多选下拉框 ▼]
☑ heater_1 (加热器A)
@@ -367,11 +375,11 @@ Sync Devices: [多选下拉框 ▼]
☐ pump_1 (注射泵)
```
### 手动配置placeholder_keys
### 手动配置 placeholder_keys
如果需要手动添加或覆盖自动生成的placeholder_keys
如果需要手动添加或覆盖自动生成的 placeholder_keys
#### 场景1: 非标准参数名
#### 场景 1: 非标准参数名
```yaml
action_value_mappings:
@@ -384,7 +392,7 @@ action_value_mappings:
my_device_param: unilabos_devices
```
#### 场景2: 混合类型
#### 场景 2: 混合类型
```python
def mixed_params(
@@ -398,32 +406,33 @@ def mixed_params(
```yaml
placeholder_keys:
resource: unilabos_resources # 资源选择
device: unilabos_devices # 设备选择
resource: unilabos_resources # 资源选择
device: unilabos_devices # 设备选择
# normal_param不需要placeholder_keys
```
#### 场景3: 自定义选择器
#### 场景 3: 自定义选择器
```yaml
placeholder_keys:
special_param: custom_selector # 使用自定义选择器
special_param: custom_selector # 使用自定义选择器
```
## 系统自动生成的字段
### status_types
系统会扫描你的 Python 类从状态方法propertyget_方法自动生成这部分
系统会扫描你的 Python 类从状态方法propertyget\_方法自动生成这部分
```yaml
status_types:
current_temperature: float # 从 get_current_temperature() 或 @property current_temperature
is_heating: bool # 从 get_is_heating() 或 @property is_heating
status: str # 从 get_status() 或 @property status
is_heating: bool # 从 get_is_heating() 或 @property is_heating
status: str # 从 get_status() 或 @property status
```
**注意事项**
- 系统会查找所有 `get_` 开头的方法和 `@property` 装饰的属性
- 类型会自动转成相应的类型(如 `str``float``bool`
- 如果类型是 `Any``None` 或未知的,默认使用 `String`
@@ -459,20 +468,21 @@ init_param_schema:
```
**生成规则**
- `config` 部分:分析 `__init__` 方法的参数、类型和默认值
- `data` 部分:根据 `status_types` 生成前端显示用的类型定义
### 其他自动填充的字段
```yaml
version: '1.0.0' # 默认版本
category: ['文件名'] # 使用 yaml 文件名作为类别
description: '' # 默认为空
icon: '' # 默认为空
handles: [] # 默认空数组
config_info: [] # 默认空数组
version: '1.0.0' # 默认版本
category: ['文件名'] # 使用 yaml 文件名作为类别
description: '' # 默认为空
icon: '' # 默认为空
handles: [] # 默认空数组
config_info: [] # 默认空数组
file_path: '/path/to/file' # 系统自动填写
registry_type: 'device' # 自动设为设备类型
registry_type: 'device' # 自动设为设备类型
```
### handles 字段
@@ -510,7 +520,7 @@ config_info: # 嵌套配置,用于包含子设备
## 完整示例
### Python驱动代码
### Python 驱动代码
```python
# unilabos/devices/my_lab/liquid_handler.py
@@ -520,22 +530,22 @@ from typing import List, Dict, Any, Optional
class AdvancedLiquidHandler:
"""高级液体处理工作站"""
def __init__(self, config: Dict[str, Any]):
self.simulation = config.get('simulation', False)
self._status = "idle"
self._temperature = 25.0
@property
def status(self) -> str:
"""设备状态"""
return self._status
@property
def temperature(self) -> float:
"""当前温度"""
return self._temperature
def transfer(
self,
source: ResourceSlot,
@@ -545,7 +555,7 @@ class AdvancedLiquidHandler:
) -> Dict[str, Any]:
"""转移液体"""
return {"success": True}
def multi_transfer(
self,
source: ResourceSlot,
@@ -554,7 +564,7 @@ class AdvancedLiquidHandler:
) -> Dict[str, Any]:
"""多目标转移"""
return {"success": True}
def coordinate_with_heater(
self,
plate: ResourceSlot,
@@ -574,12 +584,12 @@ advanced_liquid_handler:
class:
module: unilabos.devices.my_lab.liquid_handler:AdvancedLiquidHandler
type: python
# 自动提取的状态类型
status_types:
status: str
temperature: float
# 自动生成的初始化参数
init_param_schema:
config:
@@ -597,7 +607,7 @@ advanced_liquid_handler:
required:
- status
type: object
# 动作映射
action_value_mappings:
transfer:
@@ -613,28 +623,28 @@ advanced_liquid_handler:
volume: 0.0
tip: null
placeholder_keys:
source: unilabos_resources # 自动添加
target: unilabos_resources # 自动添加
tip: unilabos_resources # 自动添加
source: unilabos_resources # 自动添加
target: unilabos_resources # 自动添加
tip: unilabos_resources # 自动添加
result:
success: success
schema:
description: "转移液体"
description: '转移液体'
properties:
goal:
properties:
source:
type: object
description: "源容器"
description: '源容器'
target:
type: object
description: "目标容器"
description: '目标容器'
volume:
type: number
description: "体积(μL)"
description: '体积(μL)'
tip:
type: object
description: "枪头(可选)"
description: '枪头(可选)'
required:
- source
- target
@@ -643,7 +653,7 @@ advanced_liquid_handler:
required:
- goal
type: object
multi_transfer:
type: UniLabJsonCommand
goal:
@@ -651,11 +661,11 @@ advanced_liquid_handler:
targets: targets
volumes: volumes
placeholder_keys:
source: unilabos_resources # 单选
targets: unilabos_resources # 多选
source: unilabos_resources # 单选
targets: unilabos_resources # 多选
result:
success: success
coordinate_with_heater:
type: UniLabJsonCommand
goal:
@@ -663,17 +673,17 @@ advanced_liquid_handler:
heater: heater
temperature: temperature
placeholder_keys:
plate: unilabos_resources # 资源选择
heater: unilabos_devices # 设备选择
plate: unilabos_resources # 资源选择
heater: unilabos_devices # 设备选择
result:
success: success
description: "高级液体处理工作站,支持多目标转移和设备协同"
version: "1.0.0"
description: '高级液体处理工作站,支持多目标转移和设备协同'
version: '1.0.0'
category:
- liquid_handling
handles: []
icon: ""
icon: ''
```
### 另一个完整示例:温度控制器
@@ -892,17 +902,18 @@ unilab -g dev.json --complete_registry
cat unilabos/registry/devices/my_device.yaml
```
### 2. 验证placeholder_keys
### 2. 验证 placeholder_keys
确认:
- ResourceSlot参数有 `unilabos_resources`
- DeviceSlot参数有 `unilabos_devices`
- List类型被正确识别
- ResourceSlot 参数有 `unilabos_resources`
- DeviceSlot 参数有 `unilabos_devices`
- List 类型被正确识别
### 3. 测试前端效果
1. 启动Uni-Lab
2. 访问Web界面
1. 启动 Uni-Lab
2. 访问 Web 界面
3. 选择设备
4. 调用动作
5. 检查是否显示正确的选择器
@@ -916,18 +927,21 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice"
## 常见问题
### Q1: placeholder_keys没有自动生成
### Q1: placeholder_keys 没有自动生成
**检查**:
1. 是否使用了`--complete_registry`参数?
2. 类型注解是否正确?
```python
# ✓ 正确
def method(self, resource: ResourceSlot):
# ✗ 错误(缺少类型注解)
def method(self, resource):
```
3. 是否正确导入?
```python
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
@@ -935,9 +949,10 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice"
### Q2: 前端显示普通输入框而不是选择器
**原因**: placeholder_keys未正确配置
**原因**: placeholder_keys 未正确配置
**解决**:
```yaml
# 检查YAML中是否有
placeholder_keys:
@@ -947,6 +962,7 @@ placeholder_keys:
### Q3: 多选不工作
**检查类型注解**:
```python
# ✓ 正确 - 会生成多选
def method(self, resources: List[ResourceSlot]):
@@ -960,13 +976,15 @@ def method(self, resources: ResourceSlot):
**说明**: 运行时会自动转换
前端传递:
```json
{
"resource": "plate_1" // 字符串ID
"resource": "plate_1" // 字符串ID
}
```
运行时收到:
```python
resource.id # "plate_1"
resource.name # "96孔板"
@@ -977,6 +995,7 @@ resource.type # "resource"
### Q5: 设备加载不了
**检查**:
1. 确认 `class.module` 路径是否正确
2. 确认 Python 驱动类能否正常导入
3. 使用 yaml 验证器检查文件格式
@@ -985,6 +1004,7 @@ resource.type # "resource"
### Q6: 自动生成失败
**检查**:
1. 确认类继承了正确的基类
2. 确保状态方法的返回类型注解清晰
3. 检查类能否被动态导入
@@ -993,6 +1013,7 @@ resource.type # "resource"
### Q7: 前端显示问题
**解决步骤**:
1. 删除旧的 yaml 文件,用编辑器重新生成
2. 清除浏览器缓存,重新加载页面
3. 确认必需字段(如 `schema`)都存在
@@ -1001,6 +1022,7 @@ resource.type # "resource"
### Q8: 动作执行出错
**检查**:
1. 确认动作方法名符合规范(如 `execute_<action_name>`
2. 检查 `goal` 字段的参数映射是否正确
3. 确认方法返回值格式符合 `result` 映射
@@ -1041,7 +1063,7 @@ def transfer(self, r1: ResourceSlot, r2: ResourceSlot):
pass
```
3. **使用Optional表示可选参数**
3. **使用 Optional 表示可选参数**
```python
from typing import Optional
@@ -1063,11 +1085,11 @@ def method(
targets: List[ResourceSlot] # 目标容器列表
) -> Dict[str, Any]:
"""方法说明
Args:
source: 源容器,必须包含足够的液体
targets: 目标容器列表,每个容器应该为空
Returns:
包含操作结果的字典
"""
@@ -1075,6 +1097,7 @@ def method(
```
5. **方法命名规范**
- 状态方法使用 `@property` 装饰器或 `get_` 前缀
- 动作方法使用动词开头
- 保持命名清晰、一致
@@ -1111,8 +1134,6 @@ def method(
- {doc}`add_device` - 设备驱动编写指南
- {doc}`04_add_device_testing` - 设备测试指南
- Python [typing模块](https://docs.python.org/3/library/typing.html)
- [YAML语法](https://yaml.org/)
- Python [typing 模块](https://docs.python.org/3/library/typing.html)
- [YAML 语法](https://yaml.org/)
- [JSON Schema](https://json-schema.org/)

View File

@@ -1,4 +1,4 @@
# 实例电池装配工站接入PLC控制
# 实例电池装配工站接入PLC 控制)
> **文档类型**:实际应用案例
> **适用场景**:使用 PLC 控制的电池装配工站接入
@@ -50,8 +50,6 @@ class CoinCellAssemblyWorkstation(WorkstationBase):
self.client = tcp.register_node_list(self.nodes)
```
## 2. 编写驱动与寄存器读写
### 2.1 寄存器示例
@@ -95,9 +93,9 @@ def start_and_read_metrics(self):
完成工站类与驱动后,需要生成(或更新)工站注册表供系统识别。
### 3.1 新增工站设备(或资源)首次生成注册表
首先通过以下命令启动unilab。进入unilab系统状态检查页面
首先通过以下命令启动 unilab。进入 unilab 系统状态检查页面
```bash
python unilabos\app\main.py -g celljson.json --ak <user的AK> --sk <user的SK>
@@ -112,35 +110,32 @@ python unilabos\app\main.py -g celljson.json --ak <user的AK> --sk <user的SK>
![注册表生成流程](image_battery_plc/unilab_registry_process.png)
步骤说明:
1. 选择新增的工站`coin_cell_assembly.py`文件
2. 点击分析按钮,分析`coin_cell_assembly.py`文件
3. 选择`coin_cell_assembly.py`文件中继承`WorkstationBase`
4. 填写新增的工站.py文件与`unilabos`目录的距离。例如,新增的工站文件`coin_cell_assembly.py`路径为`unilabos\devices\workstation\coin_cell_assembly\coin_cell_assembly.py`,则此处填写`unilabos.devices.workstation.coin_cell_assembly`
4. 填写新增的工站.py 文件与`unilabos`目录的距离。例如,新增的工站文件`coin_cell_assembly.py`路径为`unilabos\devices\workstation\coin_cell_assembly\coin_cell_assembly.py`,则此处填写`unilabos.devices.workstation.coin_cell_assembly`
5. 此处填写新定义工站的类的名字(名称可以自拟)
6. 填写新的工站注册表备注信息
7. 生成注册表
以上操作步骤完成则会生成的新的注册表YAML文件如下图
以上操作步骤完成,则会生成的新的注册表 YAML 文件,如下图:
![生成的YAML文件](image_battery_plc/unilab_new_yaml.png)
### 3.2 添加新生成注册表
`unilabos\registry\devices`目录下新建一个yaml文件此处新建文件命名为`coincellassemblyworkstation_device.yaml`,将上面生成的新的注册表信息粘贴到`coincellassemblyworkstation_device.yaml`文件中。
`unilabos\registry\devices`目录下新建一个 yaml 文件,此处新建文件命名为`coincellassemblyworkstation_device.yaml`,将上面生成的新的注册表信息粘贴到`coincellassemblyworkstation_device.yaml`文件中。
在终端输入以下命令进行注册表补全操作。
```bash
python unilabos\app\register.py --complete_registry
```
### 3.3 启动并上传注册表
新增设备之后启动unilab需要增加`--upload_registry`参数,来上传注册表信息。
新增设备之后,启动 unilab 需要增加`--upload_registry`参数,来上传注册表信息。
```bash
python unilabos\app\main.py -g celljson.json --ak <user的AK> --sk <user的SK> --upload_registry
@@ -159,6 +154,7 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC
### 4.2 首次接入流程
首次新增设备(或资源)需要完整流程:
1. ✅ 在网页端生成注册表信息
2. ✅ 使用 `--complete_registry` 补全注册表
3. ✅ 使用 `--upload_registry` 上传注册表信息
@@ -166,11 +162,12 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC
### 4.3 驱动更新流程
如果不是新增设备,仅修改了工站驱动的 `.py` 文件:
1. ✅ 运行 `--complete_registry` 补全注册表
2. ✅ 运行 `--upload_registry` 上传注册表
3. ❌ 不需要在网页端重新生成注册表
### 4.4 PLC通信注意事项
### 4.4 PLC 通信注意事项
- **握手机制**:若需参数下发,建议在 PLC 端设置标志寄存器并完成握手复位,避免粘连与竞争
- **字节序**FLOAT32 等多字节数据类型需要正确指定字节序(如 `WorderOrder.LITTLE`
@@ -203,5 +200,3 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC
5. ✅ 新增设备与更新驱动的区别
这个案例展示了完整的 PLC 设备接入流程,可以作为其他类似设备接入的参考模板。

View File

@@ -16,8 +16,8 @@
这类工站由开发者自研,组合所有子设备和实验耗材、希望让他们在工作站这一级协调配合;
1. 工作站包含大量已经注册的子设备,可能各自通信组态很不相同;部分设备可能会拥有同一个通信设备作为出口,如2个泵共用1个串口、所有设备共同接入PLC等。
2. 任务系统是统一实现的 protocolsprotocols 中会将高层指令处理成各子设备配合的工作流 json并管理执行、同时更改物料信息
1. 工作站包含大量已经注册的子设备,可能各自通信组态很不相同;部分设备可能会拥有同一个通信设备作为出口,如 2 个泵共用 1 个串口、所有设备共同接入 PLC 等。
2. 任务系统是统一实现的 protocolsprotocols 中会将高层指令处理成各子设备配合的工作流 json 并管理执行、同时更改物料信息
3. 物料系统较为简单直接,如常量有机化学仅为工作站内固定的瓶子,初始化时就已固定;随后在任务执行过程中,记录试剂量更改信息
### 0.2 移液工作站:物料系统和工作流模板管理
@@ -35,7 +35,7 @@
由厂家开发,具备完善的物料系统、任务系统甚至调度系统;由 PLC 或 OpenAPI TCP 协议统一通信
1. 在监控状态时,希望展现子设备的状态;但子设备仅为逻辑概念,通信由工作站上位机接口提供;部分情况下,子设备状态是被记录在文件中的,需要读取
2. 工作站有自己的工作流系统甚至调度系统;可以通过脚本/PLC连续读写来配置工作站可用的工作流
2. 工作站有自己的工作流系统甚至调度系统;可以通过脚本/PLC 连续读写来配置工作站可用的工作流;
3. 部分拥有完善的物料入库、出库、过程记录,需要与 Uni-Lab-OS 物料系统对接
## 1. 整体架构图
@@ -49,7 +49,7 @@ graph TB
RPN[ROS2WorkstationNode<br/>Protocol执行引擎]
WB -.post_init关联.-> RPN
end
subgraph "物料管理系统"
DECK[Deck<br/>PLR本地物料系统]
RS[ResourceSynchronizer<br/>外部物料同步器]
@@ -57,7 +57,7 @@ graph TB
WB --> RS
RS --> DECK
end
subgraph "通信与子设备管理"
HW[hardware_interface<br/>硬件通信接口]
SUBDEV[子设备集合<br/>pumps/grippers/sensors]
@@ -65,7 +65,7 @@ graph TB
RPN --> SUBDEV
HW -.代理模式.-> RPN
end
subgraph "工作流任务系统"
PROTO[Protocol定义<br/>LiquidHandling/PlateHandling]
WORKFLOW[Workflow执行器<br/>步骤管理与编排]
@@ -85,32 +85,32 @@ graph LR
HW2[通信接口<br/>hardware_interface]
HTTP[HTTP服务<br/>WorkstationHTTPService]
end
subgraph "外部物料系统"
BIOYOND[Bioyond物料管理]
LIMS[LIMS系统]
WAREHOUSE[第三方仓储]
end
subgraph "外部硬件系统"
PLC[PLC设备]
SERIAL[串口设备]
ROBOT[机械臂/机器人]
end
subgraph "云端系统"
CLOUD[UniLab云端<br/>资源管理]
MONITOR[监控与调度]
end
BIOYOND <-->|RPC双向同步| DECK2
LIMS -->|HTTP报送| HTTP
WAREHOUSE <-->|API对接| DECK2
PLC <-->|Modbus TCP| HW2
SERIAL <-->|串口通信| HW2
ROBOT <-->|SDK/API| HW2
WS -->|ROS消息| CLOUD
CLOUD -->|任务下发| WS
MONITOR -->|状态查询| WS
@@ -123,40 +123,40 @@ graph TB
subgraph "工作站基类"
BASE[WorkstationBase<br/>抽象基类]
end
subgraph "Bioyond集成工作站"
BW[BioyondWorkstation]
BW_DECK[Deck + Warehouses]
BW_SYNC[BioyondResourceSynchronizer]
BW_HW[BioyondV1RPC]
BW_HTTP[HTTP报送服务]
BW --> BW_DECK
BW --> BW_SYNC
BW --> BW_HW
BW --> BW_HTTP
end
subgraph "纯协议节点"
PN[ProtocolNode]
PN_SUB[子设备集合]
PN_PROTO[Protocol工作流]
PN --> PN_SUB
PN --> PN_PROTO
end
subgraph "PLC控制工作站"
PW[PLCWorkstation]
PW_DECK[Deck物料系统]
PW_PLC[Modbus PLC客户端]
PW_WF[工作流定义]
PW --> PW_DECK
PW --> PW_PLC
PW --> PW_WF
end
BASE -.继承.-> BW
BASE -.继承.-> PN
BASE -.继承.-> PW
@@ -175,25 +175,25 @@ classDiagram
+hardware_interface: Union[Any, str]
+current_workflow_status: WorkflowStatus
+supported_workflows: Dict[str, WorkflowInfo]
+post_init(ros_node)*
+set_hardware_interface(interface)
+call_device_method(method, *args, **kwargs)
+get_device_status()
+is_device_available()
+get_deck()
+get_all_resources()
+find_resource_by_name(name)
+find_resources_by_type(type)
+sync_with_external_system()
+execute_workflow(name, params)
+stop_workflow(emergency)
+workflow_status
+is_busy
}
class ROS2WorkstationNode {
+device_id: str
+children: Dict[str, Any]
@@ -202,7 +202,7 @@ classDiagram
+_action_clients: Dict
+_action_servers: Dict
+resource_tracker: DeviceNodeResourceTracker
+initialize_device(device_id, config)
+create_ros_action_server(action_name, mapping)
+execute_single_action(device_id, action, kwargs)
@@ -210,14 +210,14 @@ classDiagram
+transfer_resource_to_another(resources, target, sites)
+_setup_hardware_proxy(device, comm_device, read, write)
}
%% 物料管理相关类
class Deck {
+name: str
+children: List
+assign_child_resource()
}
class ResourceSynchronizer {
<<abstract>>
+workstation: WorkstationBase
@@ -225,23 +225,23 @@ classDiagram
+sync_to_external(plr_resource)*
+handle_external_change(change_info)*
}
class BioyondResourceSynchronizer {
+bioyond_api_client: BioyondV1RPC
+sync_interval: int
+last_sync_time: float
+initialize()
+sync_from_external()
+sync_to_external(resource)
+handle_external_change(change_info)
}
%% 硬件接口相关类
class HardwareInterface {
<<interface>>
}
class BioyondV1RPC {
+base_url: str
+api_key: str
@@ -249,7 +249,7 @@ classDiagram
+add_material()
+material_inbound()
}
%% 服务类
class WorkstationHTTPService {
+workstation: WorkstationBase
@@ -257,7 +257,7 @@ classDiagram
+port: int
+server: HTTPServer
+running: bool
+start()
+stop()
+_handle_step_finish_report()
@@ -266,13 +266,13 @@ classDiagram
+_handle_material_change_report()
+_handle_error_handling_report()
}
%% 具体实现类
class BioyondWorkstation {
+bioyond_config: Dict
+workflow_mappings: Dict
+workflow_sequence: List
+post_init(ros_node)
+transfer_resource_to_another()
+resource_tree_add(resources)
@@ -280,25 +280,25 @@ classDiagram
+get_all_workflows()
+get_bioyond_status()
}
class ProtocolNode {
+post_init(ros_node)
}
%% 核心关系
WorkstationBase o-- ROS2WorkstationNode : post_init关联
WorkstationBase o-- WorkstationHTTPService : 可选服务
%% 物料管理侧
WorkstationBase *-- Deck : deck
WorkstationBase *-- ResourceSynchronizer : 可选组合
ResourceSynchronizer <|-- BioyondResourceSynchronizer
%% 硬件接口侧
WorkstationBase o-- HardwareInterface : hardware_interface
HardwareInterface <|.. BioyondV1RPC : 实现
BioyondResourceSynchronizer --> BioyondV1RPC : 使用
%% 继承关系
BioyondWorkstation --|> WorkstationBase
ProtocolNode --|> WorkstationBase
@@ -316,49 +316,49 @@ sequenceDiagram
participant HW as HardwareInterface
participant ROS as ROS2WorkstationNode
participant HTTP as HTTPService
APP->>WS: 创建工作站实例(__init__)
WS->>DECK: 初始化PLR Deck
DECK->>DECK: 创建Warehouse等子资源
DECK-->>WS: Deck创建完成
WS->>HW: 创建硬件接口(如BioyondV1RPC)
HW->>HW: 建立连接(PLC/RPC/串口等)
HW-->>WS: 硬件接口就绪
WS->>SYNC: 创建ResourceSynchronizer(可选)
SYNC->>HW: 使用hardware_interface
SYNC->>SYNC: 初始化同步配置
SYNC-->>WS: 同步器创建完成
WS->>SYNC: sync_from_external()
SYNC->>HW: 查询外部物料系统
HW-->>SYNC: 返回物料数据
SYNC->>DECK: 转换并添加到Deck
SYNC-->>WS: 同步完成
Note over WS: __init__完成,等待ROS节点
APP->>ROS: 初始化ROS2WorkstationNode
ROS->>ROS: 初始化子设备(children)
ROS->>ROS: 创建Action客户端
ROS->>ROS: 设置硬件接口代理
ROS-->>APP: ROS节点就绪
APP->>WS: post_init(ros_node)
WS->>WS: self._ros_node = ros_node
WS->>ROS: update_resource([deck])
ROS->>ROS: 上传物料到云端
ROS-->>WS: 上传完成
WS->>HTTP: 创建WorkstationHTTPService(可选)
HTTP->>HTTP: 启动HTTP服务器线程
HTTP-->>WS: HTTP服务启动
WS-->>APP: 工作站完全就绪
```
## 4. 工作流执行时序图Protocol模式
## 4. 工作流执行时序图Protocol 模式)
```{mermaid}
sequenceDiagram
@@ -369,15 +369,15 @@ sequenceDiagram
participant DECK as PLR Deck
participant CLOUD as 云端资源管理
participant DEV as 子设备
CLIENT->>ROS: 发送Protocol Action请求
ROS->>ROS: execute_protocol回调
ROS->>ROS: 从Goal提取参数
ROS->>ROS: 调用protocol_steps_generator
ROS->>ROS: 生成action步骤列表
ROS->>WS: 更新workflow_status = RUNNING
loop 执行每个步骤
alt 调用子设备
ROS->>ROS: execute_single_action(device_id, action, params)
@@ -398,19 +398,19 @@ sequenceDiagram
end
WS-->>ROS: 返回结果
end
ROS->>DECK: 更新本地物料状态
DECK->>DECK: 修改PLR资源属性
end
ROS->>CLOUD: 同步物料到云端(可选)
CLOUD-->>ROS: 同步完成
ROS->>WS: 更新workflow_status = COMPLETED
ROS-->>CLIENT: 返回Protocol Result
```
## 5. HTTP报送处理时序图
## 5. HTTP 报送处理时序图
```{mermaid}
sequenceDiagram
@@ -420,25 +420,25 @@ sequenceDiagram
participant DECK as PLR Deck
participant SYNC as ResourceSynchronizer
participant CLOUD as 云端
EXT->>HTTP: POST /report/step_finish
HTTP->>HTTP: 解析请求数据
HTTP->>HTTP: 验证LIMS协议字段
HTTP->>WS: process_step_finish_report(request)
WS->>WS: 增加接收计数(_reports_received_count++)
WS->>WS: 记录步骤完成事件
WS->>DECK: 更新相关物料状态(可选)
DECK->>DECK: 修改PLR资源状态
WS->>WS: 保存报送记录到内存
WS-->>HTTP: 返回处理结果
HTTP->>HTTP: 构造HTTP响应
HTTP-->>EXT: 200 OK + acknowledgment_id
Note over EXT,CLOUD: 类似处理sample_finish, order_finish等报送
alt 物料变更报送
EXT->>HTTP: POST /report/material_change
HTTP->>WS: process_material_change_report(data)
@@ -463,7 +463,7 @@ sequenceDiagram
participant HW as HardwareInterface
participant HTTP as HTTPService
participant LOG as 日志系统
alt 设备错误(ROS Action失败)
DEV->>ROS: Action返回失败结果
ROS->>ROS: 记录错误信息
@@ -475,7 +475,7 @@ sequenceDiagram
WS->>WS: 记录错误历史
WS->>LOG: 记录错误日志
end
alt 关键错误需要停止
WS->>ROS: stop_workflow(emergency=True)
ROS->>ROS: 取消所有进行中的Action
@@ -487,44 +487,44 @@ sequenceDiagram
WS->>ROS: 触发重试逻辑(可选)
ROS->>DEV: 重新发送Action
end
WS-->>HTTP: 返回错误处理结果
HTTP-->>DEV: 200 OK + 处理状态
```
## 7. 典型工作站实现示例
### 7.1 Bioyond集成工作站实现
### 7.1 Bioyond 集成工作站实现
```python
class BioyondWorkstation(WorkstationBase):
def __init__(self, bioyond_config: Dict, deck: Deck, *args, **kwargs):
# 初始化deck
super().__init__(deck=deck, *args, **kwargs)
# 设置硬件接口为Bioyond RPC客户端
self.hardware_interface = BioyondV1RPC(bioyond_config)
# 创建资源同步器
self.resource_synchronizer = BioyondResourceSynchronizer(self)
# 从Bioyond同步物料到本地deck
self.resource_synchronizer.sync_from_external()
# 配置工作流
self.workflow_mappings = bioyond_config.get("workflow_mappings", {})
def post_init(self, ros_node: ROS2WorkstationNode):
"""ROS节点就绪后的初始化"""
self._ros_node = ros_node
# 上传deck(包括所有物料)到云端
ROS2DeviceNode.run_async_func(
self._ros_node.update_resource,
True,
self._ros_node.update_resource,
True,
resources=[self.deck]
)
def resource_tree_add(self, resources: List[ResourcePLR]):
"""添加物料并同步到Bioyond"""
for resource in resources:
@@ -537,24 +537,24 @@ class BioyondWorkstation(WorkstationBase):
```python
class ProtocolNode(WorkstationBase):
"""纯协议节点,不需要物料管理和外部通信"""
def __init__(self, deck: Optional[Deck] = None, *args, **kwargs):
super().__init__(deck=deck, *args, **kwargs)
# 不设置hardware_interface和resource_synchronizer
# 所有功能通过子设备协同完成
def post_init(self, ros_node: ROS2WorkstationNode):
self._ros_node = ros_node
# 不需要上传物料或其他初始化
```
### 7.3 PLC直接控制工作站
### 7.3 PLC 直接控制工作站
```python
class PLCWorkstation(WorkstationBase):
def __init__(self, plc_config: Dict, deck: Deck, *args, **kwargs):
super().__init__(deck=deck, *args, **kwargs)
# 设置硬件接口为Modbus客户端
from pymodbus.client import ModbusTcpClient
self.hardware_interface = ModbusTcpClient(
@@ -562,7 +562,7 @@ class PLCWorkstation(WorkstationBase):
port=plc_config["port"]
)
self.hardware_interface.connect()
# 定义支持的工作流
self.supported_workflows = {
"battery_assembly": WorkflowInfo(
@@ -574,49 +574,49 @@ class PLCWorkstation(WorkstationBase):
parameters_schema={"quantity": int, "model": str}
)
}
def execute_workflow(self, workflow_name: str, parameters: Dict):
"""通过PLC执行工作流"""
workflow_id = self._get_workflow_id(workflow_name)
# 写入PLC寄存器启动工作流
self.hardware_interface.write_register(100, workflow_id)
self.hardware_interface.write_register(101, parameters["quantity"])
self.current_workflow_status = WorkflowStatus.RUNNING
return True
```
## 8. 核心接口说明
### 8.1 WorkstationBase核心属性
### 8.1 WorkstationBase 核心属性
| 属性 | 类型 | 说明 |
| --------------------------- | ----------------------- | ----------------------------- |
| `_ros_node` | ROS2WorkstationNode | ROS节点引用由post_init设置 |
| `deck` | Deck | PyLabRobot Deck本地物料系统 |
| `plr_resources` | Dict[str, PLRResource] | 物料资源映射 |
| `resource_synchronizer` | ResourceSynchronizer | 外部物料同步器(可选) |
| `hardware_interface` | Union[Any, str] | 硬件接口或代理字符串 |
| `current_workflow_status` | WorkflowStatus | 当前工作流状态 |
| `supported_workflows` | Dict[str, WorkflowInfo] | 支持的工作流定义 |
| 属性 | 类型 | 说明 |
| ------------------------- | ----------------------- | ------------------------------- |
| `_ros_node` | ROS2WorkstationNode | ROS 节点引用,由 post_init 设置 |
| `deck` | Deck | PyLabRobot Deck本地物料系统 |
| `plr_resources` | Dict[str, PLRResource] | 物料资源映射 |
| `resource_synchronizer` | ResourceSynchronizer | 外部物料同步器(可选) |
| `hardware_interface` | Union[Any, str] | 硬件接口或代理字符串 |
| `current_workflow_status` | WorkflowStatus | 当前工作流状态 |
| `supported_workflows` | Dict[str, WorkflowInfo] | 支持的工作流定义 |
### 8.2 必须实现的方法
- `post_init(ros_node)`: ROS节点就绪后的初始化必须实现
- `post_init(ros_node)`: ROS 节点就绪后的初始化,必须实现
### 8.3 硬件接口相关方法
- `set_hardware_interface(interface)`: 设置硬件接口
- `call_device_method(method, *args, **kwargs)`: 统一设备方法调用
- 支持直接模式: 直接调用hardware_interface的方法
- 支持代理模式: hardware_interface="proxy:device_id"通过ROS转发
- 支持直接模式: 直接调用 hardware_interface 的方法
- 支持代理模式: hardware_interface="proxy:device_id"通过 ROS 转发
- `get_device_status()`: 获取设备状态
- `is_device_available()`: 检查设备可用性
### 8.4 物料管理方法
- `get_deck()`: 获取PLR Deck
- `get_deck()`: 获取 PLR Deck
- `get_all_resources()`: 获取所有物料
- `find_resource_by_name(name)`: 按名称查找物料
- `find_resources_by_type(type)`: 按类型查找物料
@@ -630,7 +630,7 @@ class PLCWorkstation(WorkstationBase):
- `is_busy`: 检查是否忙碌(属性)
- `workflow_runtime`: 获取运行时间(属性)
### 8.6 可选的HTTP报送处理方法
### 8.6 可选的 HTTP 报送处理方法
- `process_step_finish_report()`: 步骤完成处理
- `process_sample_finish_report()`: 样本完成处理
@@ -638,10 +638,10 @@ class PLCWorkstation(WorkstationBase):
- `process_material_change_report()`: 物料变更处理
- `handle_external_error()`: 错误处理
### 8.7 ROS2WorkstationNode核心方法
### 8.7 ROS2WorkstationNode 核心方法
- `initialize_device(device_id, config)`: 初始化子设备
- `create_ros_action_server(action_name, mapping)`: 创建Action服务器
- `create_ros_action_server(action_name, mapping)`: 创建 Action 服务器
- `execute_single_action(device_id, action, kwargs)`: 执行单个动作
- `update_resource(resources)`: 同步物料到云端
- `transfer_resource_to_another(...)`: 跨设备物料转移
@@ -698,7 +698,7 @@ workstation = BioyondWorkstation(
"config": {...}
},
"gripper_1": {
"type": "device",
"type": "device",
"driver": "RobotiqGripperDriver",
"communication": "io_modbus_1",
"config": {...}
@@ -720,7 +720,7 @@ workstation = BioyondWorkstation(
}
```
### 9.3 HTTP服务配置
### 9.3 HTTP 服务配置
```python
from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService
@@ -741,31 +741,31 @@ http_service.start()
### 10.1 清晰的职责分离
- **WorkstationBase**: 负责物料管理(deck)、硬件接口(hardware_interface)、工作流状态管理
- **ROS2WorkstationNode**: 负责子设备管理、Protocol执行、云端物料同步
- **ResourceSynchronizer**: 可选的外部物料系统同步(如Bioyond)
- **WorkstationHTTPService**: 可选的HTTP报送接收服务
- **ROS2WorkstationNode**: 负责子设备管理、Protocol 执行、云端物料同步
- **ResourceSynchronizer**: 可选的外部物料系统同步(如 Bioyond)
- **WorkstationHTTPService**: 可选的 HTTP 报送接收服务
### 10.2 灵活的硬件接口模式
1. **直接模式**: hardware_interface是具体对象(如BioyondV1RPC、ModbusClient)
2. **代理模式**: hardware_interface="proxy:device_id"通过ROS节点转发到子设备
1. **直接模式**: hardware_interface 是具体对象(如 BioyondV1RPC、ModbusClient)
2. **代理模式**: hardware_interface="proxy:device_id",通过 ROS 节点转发到子设备
3. **混合模式**: 工作站有自己的接口,同时管理多个子设备
### 10.3 统一的物料系统
- 基于PyLabRobot Deck的标准化物料表示
- 通过ResourceSynchronizer实现与外部系统(如Bioyond、LIMS)的双向同步
- 通过ROS2WorkstationNode实现与云端的物料状态同步
- 基于 PyLabRobot Deck 的标准化物料表示
- 通过 ResourceSynchronizer 实现与外部系统(如 Bioyond、LIMS)的双向同步
- 通过 ROS2WorkstationNode 实现与云端的物料状态同步
### 10.4 Protocol驱动的工作流
### 10.4 Protocol 驱动的工作流
- ROS2WorkstationNode负责Protocol的执行和步骤管理
- 支持子设备协同(通过Action Client调用)
- 支持工作站直接控制(通过hardware_interface)
- ROS2WorkstationNode 负责 Protocol 的执行和步骤管理
- 支持子设备协同(通过 Action Client 调用)
- 支持工作站直接控制(通过 hardware_interface)
### 10.5 可选的HTTP报送服务
### 10.5 可选的 HTTP 报送服务
- 基于LIMS协议规范的统一报送接口
- 基于 LIMS 协议规范的统一报送接口
- 支持步骤完成、样本完成、任务完成、物料变更等多种报送类型
- 与工作站解耦,可独立启停

View File

@@ -592,4 +592,3 @@ ros2 topic list
- [ROS2 网络配置](https://docs.ros.org/en/humble/Tutorials/Advanced/Networking.html)
- [DDS 配置](https://fast-dds.docs.eprosima.com/)
- Uni-Lab 云平台文档

View File

@@ -1,161 +1,156 @@
import argparse
import os
import time
from datetime import datetime
from typing import Dict, Optional, Tuple
import requests
from unilabos.config.config import OSSUploadConfig
from unilabos.app.web.client import http_client, HTTPClient
from unilabos.utils import logger
def _init_upload(file_path: str, oss_path: str, filename: Optional[str] = None,
process_key: str = "file-upload", device_id: str = "default",
expires_hours: int = 1) -> Tuple[bool, Dict]:
def _get_oss_token(
filename: str,
driver_name: str = "default",
exp_type: str = "default",
client: Optional[HTTPClient] = None,
) -> Tuple[bool, Dict]:
"""
初始化上传过程
获取OSS上传Token
Args:
file_path: 本地文件路径
oss_path: OSS目标路径
filename: 文件名如果为None则使用file_path的文件名
process_key: 处理键
device_id: 设备ID
expires_hours: 链接过期小时数
filename: 文件名
driver_name: 驱动名称
exp_type: 实验类型
client: HTTPClient实例如果不提供则使用默认的http_client
Returns:
(成功标志, 响应数据)
(成功标志, Token数据字典包含token/path/host/expires)
"""
if filename is None:
filename = os.path.basename(file_path)
# 使用提供的client或默认的http_client
if client is None:
client = http_client
# 构造初始化请求
url = f"{OSSUploadConfig.api_host}{OSSUploadConfig.init_endpoint}"
headers = {
"Authorization": OSSUploadConfig.authorization,
"Content-Type": "application/json"
}
# 构造scene参数: driver_name-exp_type
scene = f"{driver_name}-{exp_type}"
payload = {
"device_id": device_id,
"process_key": process_key,
"filename": filename,
"path": oss_path,
"expires_hours": expires_hours
}
# 构造请求URL使用client的remote_addr已包含/api/v1/
url = f"{client.remote_addr}/applications/token"
params = {"scene": scene, "filename": filename}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 201:
result = response.json()
if result.get("code") == "10000":
return True, result.get("data", {})
logger.info(f"[OSS] 请求预签名URL: scene={scene}, filename={filename}")
response = requests.get(url, params=params, headers={"Authorization": f"Lab {client.auth}"}, timeout=10)
print(f"初始化上传失败: {response.status_code}, {response.text}")
if response.status_code == 200:
result = response.json()
if result.get("code") == 0:
data = result.get("data", {})
# 转换expires时间戳为可读格式
expires_timestamp = data.get("expires", 0)
expires_datetime = datetime.fromtimestamp(expires_timestamp)
expires_str = expires_datetime.strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"[OSS] 获取预签名URL成功")
logger.info(f"[OSS] - URL: {data.get('url', 'N/A')}")
logger.info(f"[OSS] - Expires: {expires_str} (timestamp: {expires_timestamp})")
return True, data
logger.error(f"[OSS] 获取预签名URL失败: {response.status_code}, {response.text}")
return False, {}
except Exception as e:
print(f"初始化上传异常: {str(e)}")
logger.error(f"[OSS] 获取预签名URL异常: {str(e)}")
return False, {}
def _put_upload(file_path: str, upload_url: str) -> bool:
"""
执行PUT上传
使用预签名URL上传文件到OSS
Args:
file_path: 本地文件路径
upload_url: 上传URL
upload_url: 完整的预签名上传URL
Returns:
是否成功
"""
try:
logger.info(f"[OSS] 开始上传文件: {file_path}")
with open(file_path, "rb") as f:
response = requests.put(upload_url, data=f)
# 使用预签名URL上传不需要额外的认证header
response = requests.put(upload_url, data=f, timeout=300)
if response.status_code == 200:
logger.info(f"[OSS] 文件上传成功")
return True
print(f"PUT上传失败: {response.status_code}, {response.text}")
logger.error(f"[OSS] 上传失败: {response.status_code}")
logger.error(f"[OSS] 响应内容: {response.text[:500] if response.text else '无响应内容'}")
return False
except Exception as e:
print(f"PUT上传异常: {str(e)}")
logger.error(f"[OSS] 上传异常: {str(e)}")
return False
def _complete_upload(uuid: str) -> bool:
"""
完成上传过程
Args:
uuid: 上传的UUID
Returns:
是否成功
"""
url = f"{OSSUploadConfig.api_host}{OSSUploadConfig.complete_endpoint}"
headers = {
"Authorization": OSSUploadConfig.authorization,
"Content-Type": "application/json"
}
payload = {
"uuid": uuid
}
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
result = response.json()
if result.get("code") == "10000":
return True
print(f"完成上传失败: {response.status_code}, {response.text}")
return False
except Exception as e:
print(f"完成上传异常: {str(e)}")
return False
def oss_upload(file_path: str, oss_path: str, filename: Optional[str] = None,
process_key: str = "file-upload", device_id: str = "default") -> bool:
def oss_upload(
file_path: str,
filename: Optional[str] = None,
driver_name: str = "default",
exp_type: str = "default",
max_retries: int = 3,
client: Optional[HTTPClient] = None,
) -> Dict:
"""
文件上传主函数,包含重试机制
Args:
file_path: 本地文件路径
oss_path: OSS目标路径
filename: 文件名如果为None则使用file_path的文件名
process_key: 处理键
device_id: 设备ID
driver_name: 驱动名称用于构造scene
exp_type: 实验类型用于构造scene
max_retries: 最大重试次数
client: HTTPClient实例如果不提供则使用默认的http_client
Returns:
是否成功上传
Dict: {
"success": bool, # 是否上传成功
"original_path": str, # 原始文件路径
"oss_path": str # OSS路径成功时或空字符串失败时
}
"""
max_retries = OSSUploadConfig.max_retries
if filename is None:
filename = os.path.basename(file_path)
if not os.path.exists(file_path):
logger.error(f"[OSS] 文件不存在: {file_path}")
return {"success": False, "original_path": file_path, "oss_path": ""}
retry_count = 0
oss_path = ""
while retry_count < max_retries:
try:
# 步骤1初始化上传
init_success, init_data = _init_upload(
file_path=file_path,
oss_path=oss_path,
filename=filename,
process_key=process_key,
device_id=device_id
# 步骤1获取预签名URL
token_success, token_data = _get_oss_token(
filename=filename, driver_name=driver_name, exp_type=exp_type, client=client
)
if not init_success:
print(f"初始化上传失败,重试 {retry_count + 1}/{max_retries}")
if not token_success:
logger.warning(f"[OSS] 获取预签名URL失败,重试 {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(1) # 等待1秒后重试
time.sleep(1)
continue
# 获取UUID和上传URL
uuid = init_data.get("uuid")
upload_url = init_data.get("upload_url")
# 获取预签名URL和OSS路径
upload_url = token_data.get("url")
oss_path = token_data.get("path", "")
if not uuid or not upload_url:
print(f"初始化上传返回数据不完整,重试 {retry_count + 1}/{max_retries}")
if not upload_url:
logger.warning(f"[OSS] 无法获取上传URLAPI未返回url字段")
retry_count += 1
time.sleep(1)
continue
@@ -163,69 +158,82 @@ def oss_upload(file_path: str, oss_path: str, filename: Optional[str] = None,
# 步骤2PUT上传文件
put_success = _put_upload(file_path, upload_url)
if not put_success:
print(f"PUT上传失败重试 {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(1)
continue
# 步骤3完成上传
complete_success = _complete_upload(uuid)
if not complete_success:
print(f"完成上传失败,重试 {retry_count + 1}/{max_retries}")
logger.warning(f"[OSS] PUT上传失败重试 {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(1)
continue
# 所有步骤都成功
print(f"文件 {file_path} 上传成功")
return True
logger.info(f"[OSS] 文件 {file_path} 上传成功")
return {"success": True, "original_path": file_path, "oss_path": oss_path}
except Exception as e:
print(f"上传过程异常: {str(e)},重试 {retry_count + 1}/{max_retries}")
logger.error(f"[OSS] 上传过程异常: {str(e)},重试 {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(1)
print(f"文件 {file_path} 上传失败,已达到最大重试次数 {max_retries}")
return False
logger.error(f"[OSS] 文件 {file_path} 上传失败,已达到最大重试次数 {max_retries}")
return {"success": False, "original_path": file_path, "oss_path": oss_path}
if __name__ == "__main__":
# python -m unilabos.app.oss_upload -f /path/to/your/file.txt
# python -m unilabos.app.oss_upload -f /path/to/your/file.txt --driver HPLC --type test
# python -m unilabos.app.oss_upload -f /path/to/your/file.txt --driver HPLC --type test \
# --ak xxx --sk yyy --remote-addr http://xxx/api/v1
# 命令行参数解析
parser = argparse.ArgumentParser(description='文件上传测试工具')
parser.add_argument('--file', '-f', type=str, required=True, help='要上传的本地文件路径')
parser.add_argument('--path', '-p', type=str, default='/HPLC1/Any', help='OSS目标路径')
parser.add_argument('--device', '-d', type=str, default='test-device', help='设备ID')
parser.add_argument('--process', '-k', type=str, default='HPLC-txt-result', help='处理键')
parser = argparse.ArgumentParser(description="文件上传测试工具")
parser.add_argument("--file", "-f", type=str, required=True, help="要上传的本地文件路径")
parser.add_argument("--driver", "-d", type=str, default="default", help="驱动名称")
parser.add_argument("--type", "-t", type=str, default="default", help="实验类型")
parser.add_argument("--ak", type=str, help="Access Key如果提供则覆盖配置")
parser.add_argument("--sk", type=str, help="Secret Key如果提供则覆盖配置")
parser.add_argument("--remote-addr", type=str, help="远程服务器地址(包含/api/v1如果提供则覆盖配置")
args = parser.parse_args()
# 检查文件是否存在
if not os.path.exists(args.file):
print(f"错误:文件 {args.file} 不存在")
logger.error(f"错误:文件 {args.file} 不存在")
exit(1)
print("=" * 50)
print(f"开始上传文件: {args.file}")
print(f"目标路径: {args.path}")
print(f"设备ID: {args.device}")
print(f"处理键: {args.process}")
print("=" * 50)
# 如果提供了ak/sk/remote_addr创建临时HTTPClient
temp_client = None
if args.ak and args.sk:
import base64
auth = base64.b64encode(f"{args.ak}:{args.sk}".encode("utf-8")).decode("utf-8")
remote_addr = args.remote_addr if args.remote_addr else http_client.remote_addr
temp_client = HTTPClient(remote_addr=remote_addr, auth=auth)
logger.info(f"[配置] 使用自定义配置: remote_addr={remote_addr}")
elif args.remote_addr:
temp_client = HTTPClient(remote_addr=args.remote_addr, auth=http_client.auth)
logger.info(f"[配置] 使用自定义remote_addr: {args.remote_addr}")
else:
logger.info(f"[配置] 使用默认配置: remote_addr={http_client.remote_addr}")
logger.info("=" * 50)
logger.info(f"开始上传文件: {args.file}")
logger.info(f"驱动名称: {args.driver}")
logger.info(f"实验类型: {args.type}")
logger.info(f"Scene: {args.driver}-{args.type}")
logger.info("=" * 50)
# 执行上传
success = oss_upload(
result = oss_upload(
file_path=args.file,
oss_path=args.path,
filename=None, # 使用默认文件名
process_key=args.process,
device_id=args.device
driver_name=args.driver,
exp_type=args.type,
client=temp_client,
)
# 输出结果
if success:
print("\n√ 文件上传成功!")
if result["success"]:
logger.info(f"\n√ 文件上传成功!")
logger.info(f"原始路径: {result['original_path']}")
logger.info(f"OSS路径: {result['oss_path']}")
exit(0)
else:
print("\n× 文件上传失败!")
logger.error(f"\n× 文件上传失败!")
logger.error(f"原始路径: {result['original_path']}")
exit(1)

View File

@@ -39,15 +39,6 @@ class WSConfig:
ping_interval = 30 # ping间隔
# OSS上传配置
class OSSUploadConfig:
api_host = ""
authorization = ""
init_endpoint = ""
complete_endpoint = ""
max_retries = 3
# HTTP配置
class HTTPConfig:
remote_addr = "http://127.0.0.1:48197/api/v1"

View File

@@ -405,9 +405,19 @@ class RunningResultChecker(DriverChecker):
for i in range(self.driver._finished, temp):
sample_id = self.driver._get_resource_sample_id(self.driver._wf_name, i) # 从0开始计数
pdf, txt = self.driver.get_data_file(i + 1)
device_id = self.driver.device_id if hasattr(self.driver, "device_id") else "default"
oss_upload(pdf, f"hplc/{sample_id}/{os.path.basename(pdf)}", process_key="example", device_id=device_id)
oss_upload(txt, f"hplc/{sample_id}/{os.path.basename(txt)}", process_key="HPLC-txt-result", device_id=device_id)
# 使用新的OSS上传接口传入driver_name和exp_type
pdf_result = oss_upload(pdf, filename=os.path.basename(pdf), driver_name="HPLC", exp_type="analysis")
txt_result = oss_upload(txt, filename=os.path.basename(txt), driver_name="HPLC", exp_type="result")
if pdf_result["success"]:
print(f"PDF上传成功: {pdf_result['oss_path']}")
else:
print(f"PDF上传失败: {pdf_result['original_path']}")
if txt_result["success"]:
print(f"TXT上传成功: {txt_result['oss_path']}")
else:
print(f"TXT上传失败: {txt_result['original_path']}")
# self.driver.extract_data_from_txt()
except Exception as ex:
self.driver._finished = 0
@@ -456,8 +466,12 @@ if __name__ == "__main__":
}
sample_id = obj._get_resource_sample_id("test", 0)
pdf, txt = obj.get_data_file("1", after_time=datetime(2024, 11, 6, 19, 3, 6))
oss_upload(pdf, f"hplc/{sample_id}/{os.path.basename(pdf)}", process_key="example")
oss_upload(txt, f"hplc/{sample_id}/{os.path.basename(txt)}", process_key="HPLC-txt-result")
# 使用新的OSS上传接口传入driver_name和exp_type
pdf_result = oss_upload(pdf, filename=os.path.basename(pdf), driver_name="HPLC", exp_type="analysis")
txt_result = oss_upload(txt, filename=os.path.basename(txt), driver_name="HPLC", exp_type="result")
print(f"PDF上传结果: {pdf_result}")
print(f"TXT上传结果: {txt_result}")
# driver = HPLCDriver()
# for i in range(10000):
# print({k: v for k, v in driver._device_status.items() if isinstance(v, str)})