diff --git a/docs/advanced_usage/configuration.md b/docs/advanced_usage/configuration.md index d1fdb69e..3440044c 100644 --- a/docs/advanced_usage/configuration.md +++ b/docs/advanced_usage/configuration.md @@ -67,14 +67,6 @@ class WSConfig: max_reconnect_attempts = 999 # 最大重连次数 ping_interval = 30 # ping间隔(秒) -# OSS上传配置 -class OSSUploadConfig: - api_host = "" # API主机地址 - authorization = "" # 授权信息 - init_endpoint = "" # 初始化端点 - complete_endpoint = "" # 完成端点 - max_retries = 3 # 最大重试次数 - # HTTP配置 class HTTPConfig: remote_addr = "https://uni-lab.bohrium.com/api/v1" # 远程服务器地址 @@ -294,19 +286,7 @@ HTTP 客户端配置用于与云端服务通信: - UAT 环境:`https://uni-lab.uat.bohrium.com/api/v1` - 本地环境:`http://127.0.0.1:48197/api/v1` -### 4. OSSUploadConfig - OSS 上传配置 - -对象存储服务配置,用于文件上传功能: - -| 参数 | 类型 | 默认值 | 说明 | -| ------------------- | ---- | ------ | -------------------- | -| `api_host` | str | `""` | OSS API 主机地址 | -| `authorization` | str | `""` | 授权认证信息 | -| `init_endpoint` | str | `""` | 上传初始化端点 | -| `complete_endpoint` | str | `""` | 上传完成端点 | -| `max_retries` | int | `3` | 上传失败最大重试次数 | - -### 5. ROSConfig - ROS 配置 +### 4. ROSConfig - ROS 配置 配置 ROS 消息转换器需要加载的模块: diff --git a/docs/developer_guide/add_registry.md b/docs/developer_guide/add_registry.md index 122ffdd5..36caa943 100644 --- a/docs/developer_guide/add_registry.md +++ b/docs/developer_guide/add_registry.md @@ -4,7 +4,8 @@ ## 概述 -注册表(Registry)是Uni-Lab的设备配置系统,采用YAML格式定义设备的: +注册表(Registry)是 Uni-Lab 的设备配置系统,采用 YAML 格式定义设备的: + - 可用动作(Actions) - 状态类型(Status Types) - 初始化参数(Init Parameters) @@ -32,19 +33,19 @@ ### 核心字段说明 -| 字段名 | 类型 | 需要手写 | 说明 | -| ----------------- | ------ | -------- | ----------------------------------- | -| 设备标识符 | string | 是 | 设备的唯一名字,如 `mock_chiller` | -| class | object | 部分 | 设备的核心信息,必须配置 | -| description | string | 否 | 设备描述,系统默认给空字符串 | -| handles | array | 否 | 连接关系,默认为空 | -| icon | string | 否 | 图标路径,默认为空 | -| init_param_schema | object | 否 | 初始化参数,系统自动分析生成 | -| version | string | 否 | 版本号,默认 "1.0.0" | -| category | array | 否 | 设备分类,默认使用文件名 | -| config_info | array | 否 | 嵌套配置,默认为空 | -| file_path | string | 否 | 文件路径,系统自动设置 | -| registry_type | string | 否 | 注册表类型,自动设为 "device" | +| 字段名 | 类型 | 需要手写 | 说明 | +| ----------------- | ------ | -------- | --------------------------------- | +| 设备标识符 | string | 是 | 设备的唯一名字,如 `mock_chiller` | +| class | object | 部分 | 设备的核心信息,必须配置 | +| description | string | 否 | 设备描述,系统默认给空字符串 | +| handles | array | 否 | 连接关系,默认为空 | +| icon | string | 否 | 图标路径,默认为空 | +| init_param_schema | object | 否 | 初始化参数,系统自动分析生成 | +| version | string | 否 | 版本号,默认 "1.0.0" | +| category | array | 否 | 设备分类,默认使用文件名 | +| config_info | array | 否 | 嵌套配置,默认为空 | +| file_path | string | 否 | 文件路径,系统自动设置 | +| registry_type | string | 否 | 注册表类型,自动设为 "device" | ### class 字段详解 @@ -71,11 +72,11 @@ my_device: # 动作配置(详见后文) action_name: type: UniLabJsonCommand - goal: {...} - result: {...} + goal: { ... } + result: { ... } - description: "设备描述" - version: "1.0.0" + description: '设备描述' + version: '1.0.0' category: - device_category handles: [] @@ -101,21 +102,22 @@ my_device: ## 创建注册表的方式 -### 方式1: 使用注册表编辑器(推荐) +### 方式 1: 使用注册表编辑器(推荐) 适合大多数场景,快速高效。 **步骤**: -1. 启动Uni-Lab -2. 访问Web界面的"注册表编辑器" -3. 上传您的Python设备驱动文件 + +1. 启动 Uni-Lab +2. 访问 Web 界面的"注册表编辑器" +3. 上传您的 Python 设备驱动文件 4. 点击"分析文件" 5. 填写描述和图标 6. 点击"生成注册表" -7. 复制生成的YAML内容 +7. 复制生成的 YAML 内容 8. 保存到 `unilabos/registry/devices/your_device.yaml` -### 方式2: 使用--complete_registry参数(开发调试) +### 方式 2: 使用--complete_registry 参数(开发调试) 适合开发阶段,自动补全配置。 @@ -125,7 +127,8 @@ unilab -g dev.json --complete_registry --registry_path ./my_registry ``` 系统会: -1. 扫描Python类 + +1. 扫描 Python 类 2. 分析方法签名和类型 3. 自动生成缺失的字段 4. 保存到注册表文件 @@ -137,7 +140,7 @@ unilab -g dev.json --complete_registry --registry_path ./my_registry 启动系统时用 complete_registry=True 参数,让系统自动补全 ``` -### 方式3: 手动编写(高级) +### 方式 3: 手动编写(高级) 适合需要精细控制或特殊需求的场景。 @@ -186,6 +189,7 @@ my_device: | ROS 动作类型 | 标准 ROS 动作 | goal_default 和 schema | **常用的 ROS 动作类型**: + - `SendCmd`:发送简单命令 - `NavigateThroughPoses`:导航动作 - `SingleJointPosition`:单关节位置控制 @@ -251,11 +255,11 @@ heat_chill_start: ## 特殊类型的自动识别 -### ResourceSlot和DeviceSlot识别 +### ResourceSlot 和 DeviceSlot 识别 当您在驱动代码中使用这些特殊类型时,系统会自动识别并生成相应的前端选择器。 -**Python驱动代码示例**: +**Python 驱动代码示例**: ```python from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot @@ -286,24 +290,24 @@ my_device: device: device devices: devices placeholder_keys: - resource: unilabos_resources # 自动添加! - resources: unilabos_resources # 自动添加! - device: unilabos_devices # 自动添加! - devices: unilabos_devices # 自动添加! + resource: unilabos_resources # 自动添加! + resources: unilabos_resources # 自动添加! + device: unilabos_devices # 自动添加! + devices: unilabos_devices # 自动添加! result: success: success ``` ### 识别规则 -| Python类型 | placeholder_keys值 | 前端效果 | -|-----------|-------------------|---------| -| `ResourceSlot` | `unilabos_resources` | 单选资源下拉框 | +| Python 类型 | placeholder_keys 值 | 前端效果 | +| -------------------- | -------------------- | -------------- | +| `ResourceSlot` | `unilabos_resources` | 单选资源下拉框 | | `List[ResourceSlot]` | `unilabos_resources` | 多选资源下拉框 | -| `DeviceSlot` | `unilabos_devices` | 单选设备下拉框 | -| `List[DeviceSlot]` | `unilabos_devices` | 多选设备下拉框 | +| `DeviceSlot` | `unilabos_devices` | 单选设备下拉框 | +| `List[DeviceSlot]` | `unilabos_devices` | 多选设备下拉框 | -### 前端UI效果 +### 前端 UI 效果 #### 单选资源 @@ -313,6 +317,7 @@ placeholder_keys: ``` **前端渲染**: + ``` Source: [下拉选择框 ▼] ├── plate_1 (96孔板) @@ -329,6 +334,7 @@ placeholder_keys: ``` **前端渲染**: + ``` Targets: [多选下拉框 ▼] ☑ plate_1 (96孔板) @@ -345,6 +351,7 @@ placeholder_keys: ``` **前端渲染**: + ``` Pump: [下拉选择框 ▼] ├── pump_1 (注射泵A) @@ -360,6 +367,7 @@ placeholder_keys: ``` **前端渲染**: + ``` Sync Devices: [多选下拉框 ▼] ☑ heater_1 (加热器A) @@ -367,11 +375,11 @@ Sync Devices: [多选下拉框 ▼] ☐ pump_1 (注射泵) ``` -### 手动配置placeholder_keys +### 手动配置 placeholder_keys -如果需要手动添加或覆盖自动生成的placeholder_keys: +如果需要手动添加或覆盖自动生成的 placeholder_keys: -#### 场景1: 非标准参数名 +#### 场景 1: 非标准参数名 ```yaml action_value_mappings: @@ -384,7 +392,7 @@ action_value_mappings: my_device_param: unilabos_devices ``` -#### 场景2: 混合类型 +#### 场景 2: 混合类型 ```python def mixed_params( @@ -398,32 +406,33 @@ def mixed_params( ```yaml placeholder_keys: - resource: unilabos_resources # 资源选择 - device: unilabos_devices # 设备选择 + resource: unilabos_resources # 资源选择 + device: unilabos_devices # 设备选择 # normal_param不需要placeholder_keys ``` -#### 场景3: 自定义选择器 +#### 场景 3: 自定义选择器 ```yaml placeholder_keys: - special_param: custom_selector # 使用自定义选择器 + special_param: custom_selector # 使用自定义选择器 ``` ## 系统自动生成的字段 ### status_types -系统会扫描你的 Python 类,从状态方法(property或get_方法)自动生成这部分: +系统会扫描你的 Python 类,从状态方法(property 或 get\_方法)自动生成这部分: ```yaml status_types: current_temperature: float # 从 get_current_temperature() 或 @property current_temperature - is_heating: bool # 从 get_is_heating() 或 @property is_heating - status: str # 从 get_status() 或 @property status + is_heating: bool # 从 get_is_heating() 或 @property is_heating + status: str # 从 get_status() 或 @property status ``` **注意事项**: + - 系统会查找所有 `get_` 开头的方法和 `@property` 装饰的属性 - 类型会自动转成相应的类型(如 `str`、`float`、`bool`) - 如果类型是 `Any`、`None` 或未知的,默认使用 `String` @@ -459,20 +468,21 @@ init_param_schema: ``` **生成规则**: + - `config` 部分:分析 `__init__` 方法的参数、类型和默认值 - `data` 部分:根据 `status_types` 生成前端显示用的类型定义 ### 其他自动填充的字段 ```yaml -version: '1.0.0' # 默认版本 -category: ['文件名'] # 使用 yaml 文件名作为类别 -description: '' # 默认为空 -icon: '' # 默认为空 -handles: [] # 默认空数组 -config_info: [] # 默认空数组 +version: '1.0.0' # 默认版本 +category: ['文件名'] # 使用 yaml 文件名作为类别 +description: '' # 默认为空 +icon: '' # 默认为空 +handles: [] # 默认空数组 +config_info: [] # 默认空数组 file_path: '/path/to/file' # 系统自动填写 -registry_type: 'device' # 自动设为设备类型 +registry_type: 'device' # 自动设为设备类型 ``` ### handles 字段 @@ -510,7 +520,7 @@ config_info: # 嵌套配置,用于包含子设备 ## 完整示例 -### Python驱动代码 +### Python 驱动代码 ```python # unilabos/devices/my_lab/liquid_handler.py @@ -520,22 +530,22 @@ from typing import List, Dict, Any, Optional class AdvancedLiquidHandler: """高级液体处理工作站""" - + def __init__(self, config: Dict[str, Any]): self.simulation = config.get('simulation', False) self._status = "idle" self._temperature = 25.0 - + @property def status(self) -> str: """设备状态""" return self._status - + @property def temperature(self) -> float: """当前温度""" return self._temperature - + def transfer( self, source: ResourceSlot, @@ -545,7 +555,7 @@ class AdvancedLiquidHandler: ) -> Dict[str, Any]: """转移液体""" return {"success": True} - + def multi_transfer( self, source: ResourceSlot, @@ -554,7 +564,7 @@ class AdvancedLiquidHandler: ) -> Dict[str, Any]: """多目标转移""" return {"success": True} - + def coordinate_with_heater( self, plate: ResourceSlot, @@ -574,12 +584,12 @@ advanced_liquid_handler: class: module: unilabos.devices.my_lab.liquid_handler:AdvancedLiquidHandler type: python - + # 自动提取的状态类型 status_types: status: str temperature: float - + # 自动生成的初始化参数 init_param_schema: config: @@ -597,7 +607,7 @@ advanced_liquid_handler: required: - status type: object - + # 动作映射 action_value_mappings: transfer: @@ -613,28 +623,28 @@ advanced_liquid_handler: volume: 0.0 tip: null placeholder_keys: - source: unilabos_resources # 自动添加 - target: unilabos_resources # 自动添加 - tip: unilabos_resources # 自动添加 + source: unilabos_resources # 自动添加 + target: unilabos_resources # 自动添加 + tip: unilabos_resources # 自动添加 result: success: success schema: - description: "转移液体" + description: '转移液体' properties: goal: properties: source: type: object - description: "源容器" + description: '源容器' target: type: object - description: "目标容器" + description: '目标容器' volume: type: number - description: "体积(μL)" + description: '体积(μL)' tip: type: object - description: "枪头(可选)" + description: '枪头(可选)' required: - source - target @@ -643,7 +653,7 @@ advanced_liquid_handler: required: - goal type: object - + multi_transfer: type: UniLabJsonCommand goal: @@ -651,11 +661,11 @@ advanced_liquid_handler: targets: targets volumes: volumes placeholder_keys: - source: unilabos_resources # 单选 - targets: unilabos_resources # 多选 + source: unilabos_resources # 单选 + targets: unilabos_resources # 多选 result: success: success - + coordinate_with_heater: type: UniLabJsonCommand goal: @@ -663,17 +673,17 @@ advanced_liquid_handler: heater: heater temperature: temperature placeholder_keys: - plate: unilabos_resources # 资源选择 - heater: unilabos_devices # 设备选择 + plate: unilabos_resources # 资源选择 + heater: unilabos_devices # 设备选择 result: success: success - - description: "高级液体处理工作站,支持多目标转移和设备协同" - version: "1.0.0" + + description: '高级液体处理工作站,支持多目标转移和设备协同' + version: '1.0.0' category: - liquid_handling handles: [] - icon: "" + icon: '' ``` ### 另一个完整示例:温度控制器 @@ -892,17 +902,18 @@ unilab -g dev.json --complete_registry cat unilabos/registry/devices/my_device.yaml ``` -### 2. 验证placeholder_keys +### 2. 验证 placeholder_keys 确认: -- ResourceSlot参数有 `unilabos_resources` -- DeviceSlot参数有 `unilabos_devices` -- List类型被正确识别 + +- ResourceSlot 参数有 `unilabos_resources` +- DeviceSlot 参数有 `unilabos_devices` +- List 类型被正确识别 ### 3. 测试前端效果 -1. 启动Uni-Lab -2. 访问Web界面 +1. 启动 Uni-Lab +2. 访问 Web 界面 3. 选择设备 4. 调用动作 5. 检查是否显示正确的选择器 @@ -916,18 +927,21 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice" ## 常见问题 -### Q1: placeholder_keys没有自动生成 +### Q1: placeholder_keys 没有自动生成 **检查**: + 1. 是否使用了`--complete_registry`参数? 2. 类型注解是否正确? + ```python # ✓ 正确 def method(self, resource: ResourceSlot): - + # ✗ 错误(缺少类型注解) def method(self, resource): ``` + 3. 是否正确导入? ```python from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot @@ -935,9 +949,10 @@ python -c "from unilabos.devices.my_module.my_device import MyDevice" ### Q2: 前端显示普通输入框而不是选择器 -**原因**: placeholder_keys未正确配置 +**原因**: placeholder_keys 未正确配置 **解决**: + ```yaml # 检查YAML中是否有 placeholder_keys: @@ -947,6 +962,7 @@ placeholder_keys: ### Q3: 多选不工作 **检查类型注解**: + ```python # ✓ 正确 - 会生成多选 def method(self, resources: List[ResourceSlot]): @@ -960,13 +976,15 @@ def method(self, resources: ResourceSlot): **说明**: 运行时会自动转换 前端传递: + ```json { - "resource": "plate_1" // 字符串ID + "resource": "plate_1" // 字符串ID } ``` 运行时收到: + ```python resource.id # "plate_1" resource.name # "96孔板" @@ -977,6 +995,7 @@ resource.type # "resource" ### Q5: 设备加载不了 **检查**: + 1. 确认 `class.module` 路径是否正确 2. 确认 Python 驱动类能否正常导入 3. 使用 yaml 验证器检查文件格式 @@ -985,6 +1004,7 @@ resource.type # "resource" ### Q6: 自动生成失败 **检查**: + 1. 确认类继承了正确的基类 2. 确保状态方法的返回类型注解清晰 3. 检查类能否被动态导入 @@ -993,6 +1013,7 @@ resource.type # "resource" ### Q7: 前端显示问题 **解决步骤**: + 1. 删除旧的 yaml 文件,用编辑器重新生成 2. 清除浏览器缓存,重新加载页面 3. 确认必需字段(如 `schema`)都存在 @@ -1001,6 +1022,7 @@ resource.type # "resource" ### Q8: 动作执行出错 **检查**: + 1. 确认动作方法名符合规范(如 `execute_`) 2. 检查 `goal` 字段的参数映射是否正确 3. 确认方法返回值格式符合 `result` 映射 @@ -1041,7 +1063,7 @@ def transfer(self, r1: ResourceSlot, r2: ResourceSlot): pass ``` -3. **使用Optional表示可选参数** +3. **使用 Optional 表示可选参数** ```python from typing import Optional @@ -1063,11 +1085,11 @@ def method( targets: List[ResourceSlot] # 目标容器列表 ) -> Dict[str, Any]: """方法说明 - + Args: source: 源容器,必须包含足够的液体 targets: 目标容器列表,每个容器应该为空 - + Returns: 包含操作结果的字典 """ @@ -1075,6 +1097,7 @@ def method( ``` 5. **方法命名规范** + - 状态方法使用 `@property` 装饰器或 `get_` 前缀 - 动作方法使用动词开头 - 保持命名清晰、一致 @@ -1111,8 +1134,6 @@ def method( - {doc}`add_device` - 设备驱动编写指南 - {doc}`04_add_device_testing` - 设备测试指南 -- Python [typing模块](https://docs.python.org/3/library/typing.html) -- [YAML语法](https://yaml.org/) +- Python [typing 模块](https://docs.python.org/3/library/typing.html) +- [YAML 语法](https://yaml.org/) - [JSON Schema](https://json-schema.org/) - - diff --git a/docs/developer_guide/examples/battery_plc_workstation.md b/docs/developer_guide/examples/battery_plc_workstation.md index e27e4df1..3553fd20 100644 --- a/docs/developer_guide/examples/battery_plc_workstation.md +++ b/docs/developer_guide/examples/battery_plc_workstation.md @@ -1,4 +1,4 @@ -# 实例:电池装配工站接入(PLC控制) +# 实例:电池装配工站接入(PLC 控制) > **文档类型**:实际应用案例 > **适用场景**:使用 PLC 控制的电池装配工站接入 @@ -50,8 +50,6 @@ class CoinCellAssemblyWorkstation(WorkstationBase): self.client = tcp.register_node_list(self.nodes) ``` - - ## 2. 编写驱动与寄存器读写 ### 2.1 寄存器示例 @@ -95,9 +93,9 @@ def start_and_read_metrics(self): 完成工站类与驱动后,需要生成(或更新)工站注册表供系统识别。 - ### 3.1 新增工站设备(或资源)首次生成注册表 -首先通过以下命令启动unilab。进入unilab系统状态检查页面 + +首先通过以下命令启动 unilab。进入 unilab 系统状态检查页面 ```bash python unilabos\app\main.py -g celljson.json --ak --sk @@ -112,35 +110,32 @@ python unilabos\app\main.py -g celljson.json --ak --sk ![注册表生成流程](image_battery_plc/unilab_registry_process.png) 步骤说明: + 1. 选择新增的工站`coin_cell_assembly.py`文件 2. 点击分析按钮,分析`coin_cell_assembly.py`文件 3. 选择`coin_cell_assembly.py`文件中继承`WorkstationBase`类 -4. 填写新增的工站.py文件与`unilabos`目录的距离。例如,新增的工站文件`coin_cell_assembly.py`路径为`unilabos\devices\workstation\coin_cell_assembly\coin_cell_assembly.py`,则此处填写`unilabos.devices.workstation.coin_cell_assembly`。 +4. 填写新增的工站.py 文件与`unilabos`目录的距离。例如,新增的工站文件`coin_cell_assembly.py`路径为`unilabos\devices\workstation\coin_cell_assembly\coin_cell_assembly.py`,则此处填写`unilabos.devices.workstation.coin_cell_assembly`。 5. 此处填写新定义工站的类的名字(名称可以自拟) 6. 填写新的工站注册表备注信息 7. 生成注册表 -以上操作步骤完成,则会生成的新的注册表YAML文件,如下图: +以上操作步骤完成,则会生成的新的注册表 YAML 文件,如下图: ![生成的YAML文件](image_battery_plc/unilab_new_yaml.png) - - - - - ### 3.2 添加新生成注册表 -在`unilabos\registry\devices`目录下新建一个yaml文件,此处新建文件命名为`coincellassemblyworkstation_device.yaml`,将上面生成的新的注册表信息粘贴到`coincellassemblyworkstation_device.yaml`文件中。 + +在`unilabos\registry\devices`目录下新建一个 yaml 文件,此处新建文件命名为`coincellassemblyworkstation_device.yaml`,将上面生成的新的注册表信息粘贴到`coincellassemblyworkstation_device.yaml`文件中。 在终端输入以下命令进行注册表补全操作。 + ```bash python unilabos\app\register.py --complete_registry ``` - ### 3.3 启动并上传注册表 -新增设备之后,启动unilab需要增加`--upload_registry`参数,来上传注册表信息。 +新增设备之后,启动 unilab 需要增加`--upload_registry`参数,来上传注册表信息。 ```bash python unilabos\app\main.py -g celljson.json --ak --sk --upload_registry @@ -159,6 +154,7 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC ### 4.2 首次接入流程 首次新增设备(或资源)需要完整流程: + 1. ✅ 在网页端生成注册表信息 2. ✅ 使用 `--complete_registry` 补全注册表 3. ✅ 使用 `--upload_registry` 上传注册表信息 @@ -166,11 +162,12 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC ### 4.3 驱动更新流程 如果不是新增设备,仅修改了工站驱动的 `.py` 文件: + 1. ✅ 运行 `--complete_registry` 补全注册表 2. ✅ 运行 `--upload_registry` 上传注册表 3. ❌ 不需要在网页端重新生成注册表 -### 4.4 PLC通信注意事项 +### 4.4 PLC 通信注意事项 - **握手机制**:若需参数下发,建议在 PLC 端设置标志寄存器并完成握手复位,避免粘连与竞争 - **字节序**:FLOAT32 等多字节数据类型需要正确指定字节序(如 `WorderOrder.LITTLE`) @@ -203,5 +200,3 @@ module: unilabos.devices.workstation.coin_cell_assembly.coin_cell_assembly:CoinC 5. ✅ 新增设备与更新驱动的区别 这个案例展示了完整的 PLC 设备接入流程,可以作为其他类似设备接入的参考模板。 - - diff --git a/docs/developer_guide/examples/workstation_architecture.md b/docs/developer_guide/examples/workstation_architecture.md index 52f1966f..fddfd95c 100644 --- a/docs/developer_guide/examples/workstation_architecture.md +++ b/docs/developer_guide/examples/workstation_architecture.md @@ -16,8 +16,8 @@ 这类工站由开发者自研,组合所有子设备和实验耗材、希望让他们在工作站这一级协调配合; -1. 工作站包含大量已经注册的子设备,可能各自通信组态很不相同;部分设备可能会拥有同一个通信设备作为出口,如2个泵共用1个串口、所有设备共同接入PLC等。 -2. 任务系统是统一实现的 protocols,protocols 中会将高层指令处理成各子设备配合的工作流 json并管理执行、同时更改物料信息 +1. 工作站包含大量已经注册的子设备,可能各自通信组态很不相同;部分设备可能会拥有同一个通信设备作为出口,如 2 个泵共用 1 个串口、所有设备共同接入 PLC 等。 +2. 任务系统是统一实现的 protocols,protocols 中会将高层指令处理成各子设备配合的工作流 json 并管理执行、同时更改物料信息 3. 物料系统较为简单直接,如常量有机化学仅为工作站内固定的瓶子,初始化时就已固定;随后在任务执行过程中,记录试剂量更改信息 ### 0.2 移液工作站:物料系统和工作流模板管理 @@ -35,7 +35,7 @@ 由厂家开发,具备完善的物料系统、任务系统甚至调度系统;由 PLC 或 OpenAPI TCP 协议统一通信 1. 在监控状态时,希望展现子设备的状态;但子设备仅为逻辑概念,通信由工作站上位机接口提供;部分情况下,子设备状态是被记录在文件中的,需要读取 -2. 工作站有自己的工作流系统甚至调度系统;可以通过脚本/PLC连续读写来配置工作站可用的工作流; +2. 工作站有自己的工作流系统甚至调度系统;可以通过脚本/PLC 连续读写来配置工作站可用的工作流; 3. 部分拥有完善的物料入库、出库、过程记录,需要与 Uni-Lab-OS 物料系统对接 ## 1. 整体架构图 @@ -49,7 +49,7 @@ graph TB RPN[ROS2WorkstationNode
Protocol执行引擎] WB -.post_init关联.-> RPN end - + subgraph "物料管理系统" DECK[Deck
PLR本地物料系统] RS[ResourceSynchronizer
外部物料同步器] @@ -57,7 +57,7 @@ graph TB WB --> RS RS --> DECK end - + subgraph "通信与子设备管理" HW[hardware_interface
硬件通信接口] SUBDEV[子设备集合
pumps/grippers/sensors] @@ -65,7 +65,7 @@ graph TB RPN --> SUBDEV HW -.代理模式.-> RPN end - + subgraph "工作流任务系统" PROTO[Protocol定义
LiquidHandling/PlateHandling] WORKFLOW[Workflow执行器
步骤管理与编排] @@ -85,32 +85,32 @@ graph LR HW2[通信接口
hardware_interface] HTTP[HTTP服务
WorkstationHTTPService] end - + subgraph "外部物料系统" BIOYOND[Bioyond物料管理] LIMS[LIMS系统] WAREHOUSE[第三方仓储] end - + subgraph "外部硬件系统" PLC[PLC设备] SERIAL[串口设备] ROBOT[机械臂/机器人] end - + subgraph "云端系统" CLOUD[UniLab云端
资源管理] MONITOR[监控与调度] end - + BIOYOND <-->|RPC双向同步| DECK2 LIMS -->|HTTP报送| HTTP WAREHOUSE <-->|API对接| DECK2 - + PLC <-->|Modbus TCP| HW2 SERIAL <-->|串口通信| HW2 ROBOT <-->|SDK/API| HW2 - + WS -->|ROS消息| CLOUD CLOUD -->|任务下发| WS MONITOR -->|状态查询| WS @@ -123,40 +123,40 @@ graph TB subgraph "工作站基类" BASE[WorkstationBase
抽象基类] end - + subgraph "Bioyond集成工作站" BW[BioyondWorkstation] BW_DECK[Deck + Warehouses] BW_SYNC[BioyondResourceSynchronizer] BW_HW[BioyondV1RPC] BW_HTTP[HTTP报送服务] - + BW --> BW_DECK BW --> BW_SYNC BW --> BW_HW BW --> BW_HTTP end - + subgraph "纯协议节点" PN[ProtocolNode] PN_SUB[子设备集合] PN_PROTO[Protocol工作流] - + PN --> PN_SUB PN --> PN_PROTO end - + subgraph "PLC控制工作站" PW[PLCWorkstation] PW_DECK[Deck物料系统] PW_PLC[Modbus PLC客户端] PW_WF[工作流定义] - + PW --> PW_DECK PW --> PW_PLC PW --> PW_WF end - + BASE -.继承.-> BW BASE -.继承.-> PN BASE -.继承.-> PW @@ -175,25 +175,25 @@ classDiagram +hardware_interface: Union[Any, str] +current_workflow_status: WorkflowStatus +supported_workflows: Dict[str, WorkflowInfo] - + +post_init(ros_node)* +set_hardware_interface(interface) +call_device_method(method, *args, **kwargs) +get_device_status() +is_device_available() - + +get_deck() +get_all_resources() +find_resource_by_name(name) +find_resources_by_type(type) +sync_with_external_system() - + +execute_workflow(name, params) +stop_workflow(emergency) +workflow_status +is_busy } - + class ROS2WorkstationNode { +device_id: str +children: Dict[str, Any] @@ -202,7 +202,7 @@ classDiagram +_action_clients: Dict +_action_servers: Dict +resource_tracker: DeviceNodeResourceTracker - + +initialize_device(device_id, config) +create_ros_action_server(action_name, mapping) +execute_single_action(device_id, action, kwargs) @@ -210,14 +210,14 @@ classDiagram +transfer_resource_to_another(resources, target, sites) +_setup_hardware_proxy(device, comm_device, read, write) } - + %% 物料管理相关类 class Deck { +name: str +children: List +assign_child_resource() } - + class ResourceSynchronizer { <> +workstation: WorkstationBase @@ -225,23 +225,23 @@ classDiagram +sync_to_external(plr_resource)* +handle_external_change(change_info)* } - + class BioyondResourceSynchronizer { +bioyond_api_client: BioyondV1RPC +sync_interval: int +last_sync_time: float - + +initialize() +sync_from_external() +sync_to_external(resource) +handle_external_change(change_info) } - + %% 硬件接口相关类 class HardwareInterface { <> } - + class BioyondV1RPC { +base_url: str +api_key: str @@ -249,7 +249,7 @@ classDiagram +add_material() +material_inbound() } - + %% 服务类 class WorkstationHTTPService { +workstation: WorkstationBase @@ -257,7 +257,7 @@ classDiagram +port: int +server: HTTPServer +running: bool - + +start() +stop() +_handle_step_finish_report() @@ -266,13 +266,13 @@ classDiagram +_handle_material_change_report() +_handle_error_handling_report() } - + %% 具体实现类 class BioyondWorkstation { +bioyond_config: Dict +workflow_mappings: Dict +workflow_sequence: List - + +post_init(ros_node) +transfer_resource_to_another() +resource_tree_add(resources) @@ -280,25 +280,25 @@ classDiagram +get_all_workflows() +get_bioyond_status() } - + class ProtocolNode { +post_init(ros_node) } - + %% 核心关系 WorkstationBase o-- ROS2WorkstationNode : post_init关联 WorkstationBase o-- WorkstationHTTPService : 可选服务 - + %% 物料管理侧 WorkstationBase *-- Deck : deck WorkstationBase *-- ResourceSynchronizer : 可选组合 ResourceSynchronizer <|-- BioyondResourceSynchronizer - + %% 硬件接口侧 WorkstationBase o-- HardwareInterface : hardware_interface HardwareInterface <|.. BioyondV1RPC : 实现 BioyondResourceSynchronizer --> BioyondV1RPC : 使用 - + %% 继承关系 BioyondWorkstation --|> WorkstationBase ProtocolNode --|> WorkstationBase @@ -316,49 +316,49 @@ sequenceDiagram participant HW as HardwareInterface participant ROS as ROS2WorkstationNode participant HTTP as HTTPService - + APP->>WS: 创建工作站实例(__init__) WS->>DECK: 初始化PLR Deck DECK->>DECK: 创建Warehouse等子资源 DECK-->>WS: Deck创建完成 - + WS->>HW: 创建硬件接口(如BioyondV1RPC) HW->>HW: 建立连接(PLC/RPC/串口等) HW-->>WS: 硬件接口就绪 - + WS->>SYNC: 创建ResourceSynchronizer(可选) SYNC->>HW: 使用hardware_interface SYNC->>SYNC: 初始化同步配置 SYNC-->>WS: 同步器创建完成 - + WS->>SYNC: sync_from_external() SYNC->>HW: 查询外部物料系统 HW-->>SYNC: 返回物料数据 SYNC->>DECK: 转换并添加到Deck SYNC-->>WS: 同步完成 - + Note over WS: __init__完成,等待ROS节点 - + APP->>ROS: 初始化ROS2WorkstationNode ROS->>ROS: 初始化子设备(children) ROS->>ROS: 创建Action客户端 ROS->>ROS: 设置硬件接口代理 ROS-->>APP: ROS节点就绪 - + APP->>WS: post_init(ros_node) WS->>WS: self._ros_node = ros_node WS->>ROS: update_resource([deck]) ROS->>ROS: 上传物料到云端 ROS-->>WS: 上传完成 - + WS->>HTTP: 创建WorkstationHTTPService(可选) HTTP->>HTTP: 启动HTTP服务器线程 HTTP-->>WS: HTTP服务启动 - + WS-->>APP: 工作站完全就绪 ``` -## 4. 工作流执行时序图(Protocol模式) +## 4. 工作流执行时序图(Protocol 模式) ```{mermaid} sequenceDiagram @@ -369,15 +369,15 @@ sequenceDiagram participant DECK as PLR Deck participant CLOUD as 云端资源管理 participant DEV as 子设备 - + CLIENT->>ROS: 发送Protocol Action请求 ROS->>ROS: execute_protocol回调 ROS->>ROS: 从Goal提取参数 ROS->>ROS: 调用protocol_steps_generator ROS->>ROS: 生成action步骤列表 - + ROS->>WS: 更新workflow_status = RUNNING - + loop 执行每个步骤 alt 调用子设备 ROS->>ROS: execute_single_action(device_id, action, params) @@ -398,19 +398,19 @@ sequenceDiagram end WS-->>ROS: 返回结果 end - + ROS->>DECK: 更新本地物料状态 DECK->>DECK: 修改PLR资源属性 end - + ROS->>CLOUD: 同步物料到云端(可选) CLOUD-->>ROS: 同步完成 - + ROS->>WS: 更新workflow_status = COMPLETED ROS-->>CLIENT: 返回Protocol Result ``` -## 5. HTTP报送处理时序图 +## 5. HTTP 报送处理时序图 ```{mermaid} sequenceDiagram @@ -420,25 +420,25 @@ sequenceDiagram participant DECK as PLR Deck participant SYNC as ResourceSynchronizer participant CLOUD as 云端 - + EXT->>HTTP: POST /report/step_finish HTTP->>HTTP: 解析请求数据 HTTP->>HTTP: 验证LIMS协议字段 HTTP->>WS: process_step_finish_report(request) - + WS->>WS: 增加接收计数(_reports_received_count++) WS->>WS: 记录步骤完成事件 WS->>DECK: 更新相关物料状态(可选) DECK->>DECK: 修改PLR资源状态 - + WS->>WS: 保存报送记录到内存 - + WS-->>HTTP: 返回处理结果 HTTP->>HTTP: 构造HTTP响应 HTTP-->>EXT: 200 OK + acknowledgment_id - + Note over EXT,CLOUD: 类似处理sample_finish, order_finish等报送 - + alt 物料变更报送 EXT->>HTTP: POST /report/material_change HTTP->>WS: process_material_change_report(data) @@ -463,7 +463,7 @@ sequenceDiagram participant HW as HardwareInterface participant HTTP as HTTPService participant LOG as 日志系统 - + alt 设备错误(ROS Action失败) DEV->>ROS: Action返回失败结果 ROS->>ROS: 记录错误信息 @@ -475,7 +475,7 @@ sequenceDiagram WS->>WS: 记录错误历史 WS->>LOG: 记录错误日志 end - + alt 关键错误需要停止 WS->>ROS: stop_workflow(emergency=True) ROS->>ROS: 取消所有进行中的Action @@ -487,44 +487,44 @@ sequenceDiagram WS->>ROS: 触发重试逻辑(可选) ROS->>DEV: 重新发送Action end - + WS-->>HTTP: 返回错误处理结果 HTTP-->>DEV: 200 OK + 处理状态 ``` ## 7. 典型工作站实现示例 -### 7.1 Bioyond集成工作站实现 +### 7.1 Bioyond 集成工作站实现 ```python class BioyondWorkstation(WorkstationBase): def __init__(self, bioyond_config: Dict, deck: Deck, *args, **kwargs): # 初始化deck super().__init__(deck=deck, *args, **kwargs) - + # 设置硬件接口为Bioyond RPC客户端 self.hardware_interface = BioyondV1RPC(bioyond_config) - + # 创建资源同步器 self.resource_synchronizer = BioyondResourceSynchronizer(self) - + # 从Bioyond同步物料到本地deck self.resource_synchronizer.sync_from_external() - + # 配置工作流 self.workflow_mappings = bioyond_config.get("workflow_mappings", {}) - + def post_init(self, ros_node: ROS2WorkstationNode): """ROS节点就绪后的初始化""" self._ros_node = ros_node - + # 上传deck(包括所有物料)到云端 ROS2DeviceNode.run_async_func( - self._ros_node.update_resource, - True, + self._ros_node.update_resource, + True, resources=[self.deck] ) - + def resource_tree_add(self, resources: List[ResourcePLR]): """添加物料并同步到Bioyond""" for resource in resources: @@ -537,24 +537,24 @@ class BioyondWorkstation(WorkstationBase): ```python class ProtocolNode(WorkstationBase): """纯协议节点,不需要物料管理和外部通信""" - + def __init__(self, deck: Optional[Deck] = None, *args, **kwargs): super().__init__(deck=deck, *args, **kwargs) # 不设置hardware_interface和resource_synchronizer # 所有功能通过子设备协同完成 - + def post_init(self, ros_node: ROS2WorkstationNode): self._ros_node = ros_node # 不需要上传物料或其他初始化 ``` -### 7.3 PLC直接控制工作站 +### 7.3 PLC 直接控制工作站 ```python class PLCWorkstation(WorkstationBase): def __init__(self, plc_config: Dict, deck: Deck, *args, **kwargs): super().__init__(deck=deck, *args, **kwargs) - + # 设置硬件接口为Modbus客户端 from pymodbus.client import ModbusTcpClient self.hardware_interface = ModbusTcpClient( @@ -562,7 +562,7 @@ class PLCWorkstation(WorkstationBase): port=plc_config["port"] ) self.hardware_interface.connect() - + # 定义支持的工作流 self.supported_workflows = { "battery_assembly": WorkflowInfo( @@ -574,49 +574,49 @@ class PLCWorkstation(WorkstationBase): parameters_schema={"quantity": int, "model": str} ) } - + def execute_workflow(self, workflow_name: str, parameters: Dict): """通过PLC执行工作流""" workflow_id = self._get_workflow_id(workflow_name) - + # 写入PLC寄存器启动工作流 self.hardware_interface.write_register(100, workflow_id) self.hardware_interface.write_register(101, parameters["quantity"]) - + self.current_workflow_status = WorkflowStatus.RUNNING return True ``` ## 8. 核心接口说明 -### 8.1 WorkstationBase核心属性 +### 8.1 WorkstationBase 核心属性 -| 属性 | 类型 | 说明 | -| --------------------------- | ----------------------- | ----------------------------- | -| `_ros_node` | ROS2WorkstationNode | ROS节点引用,由post_init设置 | -| `deck` | Deck | PyLabRobot Deck,本地物料系统 | -| `plr_resources` | Dict[str, PLRResource] | 物料资源映射 | -| `resource_synchronizer` | ResourceSynchronizer | 外部物料同步器(可选) | -| `hardware_interface` | Union[Any, str] | 硬件接口或代理字符串 | -| `current_workflow_status` | WorkflowStatus | 当前工作流状态 | -| `supported_workflows` | Dict[str, WorkflowInfo] | 支持的工作流定义 | +| 属性 | 类型 | 说明 | +| ------------------------- | ----------------------- | ------------------------------- | +| `_ros_node` | ROS2WorkstationNode | ROS 节点引用,由 post_init 设置 | +| `deck` | Deck | PyLabRobot Deck,本地物料系统 | +| `plr_resources` | Dict[str, PLRResource] | 物料资源映射 | +| `resource_synchronizer` | ResourceSynchronizer | 外部物料同步器(可选) | +| `hardware_interface` | Union[Any, str] | 硬件接口或代理字符串 | +| `current_workflow_status` | WorkflowStatus | 当前工作流状态 | +| `supported_workflows` | Dict[str, WorkflowInfo] | 支持的工作流定义 | ### 8.2 必须实现的方法 -- `post_init(ros_node)`: ROS节点就绪后的初始化,必须实现 +- `post_init(ros_node)`: ROS 节点就绪后的初始化,必须实现 ### 8.3 硬件接口相关方法 - `set_hardware_interface(interface)`: 设置硬件接口 - `call_device_method(method, *args, **kwargs)`: 统一设备方法调用 - - 支持直接模式: 直接调用hardware_interface的方法 - - 支持代理模式: hardware_interface="proxy:device_id"通过ROS转发 + - 支持直接模式: 直接调用 hardware_interface 的方法 + - 支持代理模式: hardware_interface="proxy:device_id"通过 ROS 转发 - `get_device_status()`: 获取设备状态 - `is_device_available()`: 检查设备可用性 ### 8.4 物料管理方法 -- `get_deck()`: 获取PLR Deck +- `get_deck()`: 获取 PLR Deck - `get_all_resources()`: 获取所有物料 - `find_resource_by_name(name)`: 按名称查找物料 - `find_resources_by_type(type)`: 按类型查找物料 @@ -630,7 +630,7 @@ class PLCWorkstation(WorkstationBase): - `is_busy`: 检查是否忙碌(属性) - `workflow_runtime`: 获取运行时间(属性) -### 8.6 可选的HTTP报送处理方法 +### 8.6 可选的 HTTP 报送处理方法 - `process_step_finish_report()`: 步骤完成处理 - `process_sample_finish_report()`: 样本完成处理 @@ -638,10 +638,10 @@ class PLCWorkstation(WorkstationBase): - `process_material_change_report()`: 物料变更处理 - `handle_external_error()`: 错误处理 -### 8.7 ROS2WorkstationNode核心方法 +### 8.7 ROS2WorkstationNode 核心方法 - `initialize_device(device_id, config)`: 初始化子设备 -- `create_ros_action_server(action_name, mapping)`: 创建Action服务器 +- `create_ros_action_server(action_name, mapping)`: 创建 Action 服务器 - `execute_single_action(device_id, action, kwargs)`: 执行单个动作 - `update_resource(resources)`: 同步物料到云端 - `transfer_resource_to_another(...)`: 跨设备物料转移 @@ -698,7 +698,7 @@ workstation = BioyondWorkstation( "config": {...} }, "gripper_1": { - "type": "device", + "type": "device", "driver": "RobotiqGripperDriver", "communication": "io_modbus_1", "config": {...} @@ -720,7 +720,7 @@ workstation = BioyondWorkstation( } ``` -### 9.3 HTTP服务配置 +### 9.3 HTTP 服务配置 ```python from unilabos.devices.workstation.workstation_http_service import WorkstationHTTPService @@ -741,31 +741,31 @@ http_service.start() ### 10.1 清晰的职责分离 - **WorkstationBase**: 负责物料管理(deck)、硬件接口(hardware_interface)、工作流状态管理 -- **ROS2WorkstationNode**: 负责子设备管理、Protocol执行、云端物料同步 -- **ResourceSynchronizer**: 可选的外部物料系统同步(如Bioyond) -- **WorkstationHTTPService**: 可选的HTTP报送接收服务 +- **ROS2WorkstationNode**: 负责子设备管理、Protocol 执行、云端物料同步 +- **ResourceSynchronizer**: 可选的外部物料系统同步(如 Bioyond) +- **WorkstationHTTPService**: 可选的 HTTP 报送接收服务 ### 10.2 灵活的硬件接口模式 -1. **直接模式**: hardware_interface是具体对象(如BioyondV1RPC、ModbusClient) -2. **代理模式**: hardware_interface="proxy:device_id",通过ROS节点转发到子设备 +1. **直接模式**: hardware_interface 是具体对象(如 BioyondV1RPC、ModbusClient) +2. **代理模式**: hardware_interface="proxy:device_id",通过 ROS 节点转发到子设备 3. **混合模式**: 工作站有自己的接口,同时管理多个子设备 ### 10.3 统一的物料系统 -- 基于PyLabRobot Deck的标准化物料表示 -- 通过ResourceSynchronizer实现与外部系统(如Bioyond、LIMS)的双向同步 -- 通过ROS2WorkstationNode实现与云端的物料状态同步 +- 基于 PyLabRobot Deck 的标准化物料表示 +- 通过 ResourceSynchronizer 实现与外部系统(如 Bioyond、LIMS)的双向同步 +- 通过 ROS2WorkstationNode 实现与云端的物料状态同步 -### 10.4 Protocol驱动的工作流 +### 10.4 Protocol 驱动的工作流 -- ROS2WorkstationNode负责Protocol的执行和步骤管理 -- 支持子设备协同(通过Action Client调用) -- 支持工作站直接控制(通过hardware_interface) +- ROS2WorkstationNode 负责 Protocol 的执行和步骤管理 +- 支持子设备协同(通过 Action Client 调用) +- 支持工作站直接控制(通过 hardware_interface) -### 10.5 可选的HTTP报送服务 +### 10.5 可选的 HTTP 报送服务 -- 基于LIMS协议规范的统一报送接口 +- 基于 LIMS 协议规范的统一报送接口 - 支持步骤完成、样本完成、任务完成、物料变更等多种报送类型 - 与工作站解耦,可独立启停 diff --git a/docs/developer_guide/networking_overview.md b/docs/developer_guide/networking_overview.md index e2cda520..40b308d3 100644 --- a/docs/developer_guide/networking_overview.md +++ b/docs/developer_guide/networking_overview.md @@ -592,4 +592,3 @@ ros2 topic list - [ROS2 网络配置](https://docs.ros.org/en/humble/Tutorials/Advanced/Networking.html) - [DDS 配置](https://fast-dds.docs.eprosima.com/) - Uni-Lab 云平台文档 - diff --git a/unilabos/app/oss_upload.py b/unilabos/app/oss_upload.py index 6f9431fe..e7cbc3f2 100644 --- a/unilabos/app/oss_upload.py +++ b/unilabos/app/oss_upload.py @@ -1,161 +1,156 @@ import argparse import os import time +from datetime import datetime from typing import Dict, Optional, Tuple import requests -from unilabos.config.config import OSSUploadConfig +from unilabos.app.web.client import http_client, HTTPClient +from unilabos.utils import logger -def _init_upload(file_path: str, oss_path: str, filename: Optional[str] = None, - process_key: str = "file-upload", device_id: str = "default", - expires_hours: int = 1) -> Tuple[bool, Dict]: +def _get_oss_token( + filename: str, + driver_name: str = "default", + exp_type: str = "default", + client: Optional[HTTPClient] = None, +) -> Tuple[bool, Dict]: """ - 初始化上传过程 + 获取OSS上传Token Args: - file_path: 本地文件路径 - oss_path: OSS目标路径 - filename: 文件名,如果为None则使用file_path的文件名 - process_key: 处理键 - device_id: 设备ID - expires_hours: 链接过期小时数 + filename: 文件名 + driver_name: 驱动名称 + exp_type: 实验类型 + client: HTTPClient实例,如果不提供则使用默认的http_client Returns: - (成功标志, 响应数据) + (成功标志, Token数据字典包含token/path/host/expires) """ - if filename is None: - filename = os.path.basename(file_path) + # 使用提供的client或默认的http_client + if client is None: + client = http_client - # 构造初始化请求 - url = f"{OSSUploadConfig.api_host}{OSSUploadConfig.init_endpoint}" - headers = { - "Authorization": OSSUploadConfig.authorization, - "Content-Type": "application/json" - } + # 构造scene参数: driver_name-exp_type + scene = f"{driver_name}-{exp_type}" - payload = { - "device_id": device_id, - "process_key": process_key, - "filename": filename, - "path": oss_path, - "expires_hours": expires_hours - } + # 构造请求URL,使用client的remote_addr(已包含/api/v1/) + url = f"{client.remote_addr}/applications/token" + params = {"scene": scene, "filename": filename} try: - response = requests.post(url, headers=headers, json=payload) - if response.status_code == 201: - result = response.json() - if result.get("code") == "10000": - return True, result.get("data", {}) + logger.info(f"[OSS] 请求预签名URL: scene={scene}, filename={filename}") + response = requests.get(url, params=params, headers={"Authorization": f"Lab {client.auth}"}, timeout=10) - print(f"初始化上传失败: {response.status_code}, {response.text}") + if response.status_code == 200: + result = response.json() + if result.get("code") == 0: + data = result.get("data", {}) + + # 转换expires时间戳为可读格式 + expires_timestamp = data.get("expires", 0) + expires_datetime = datetime.fromtimestamp(expires_timestamp) + expires_str = expires_datetime.strftime("%Y-%m-%d %H:%M:%S") + + logger.info(f"[OSS] 获取预签名URL成功") + logger.info(f"[OSS] - URL: {data.get('url', 'N/A')}") + logger.info(f"[OSS] - Expires: {expires_str} (timestamp: {expires_timestamp})") + + return True, data + + logger.error(f"[OSS] 获取预签名URL失败: {response.status_code}, {response.text}") return False, {} except Exception as e: - print(f"初始化上传异常: {str(e)}") + logger.error(f"[OSS] 获取预签名URL异常: {str(e)}") return False, {} def _put_upload(file_path: str, upload_url: str) -> bool: """ - 执行PUT上传 + 使用预签名URL上传文件到OSS Args: file_path: 本地文件路径 - upload_url: 上传URL + upload_url: 完整的预签名上传URL Returns: 是否成功 """ try: + logger.info(f"[OSS] 开始上传文件: {file_path}") + with open(file_path, "rb") as f: - response = requests.put(upload_url, data=f) + # 使用预签名URL上传,不需要额外的认证header + response = requests.put(upload_url, data=f, timeout=300) + if response.status_code == 200: + logger.info(f"[OSS] 文件上传成功") return True - print(f"PUT上传失败: {response.status_code}, {response.text}") + logger.error(f"[OSS] 上传失败: {response.status_code}") + logger.error(f"[OSS] 响应内容: {response.text[:500] if response.text else '无响应内容'}") return False except Exception as e: - print(f"PUT上传异常: {str(e)}") + logger.error(f"[OSS] 上传异常: {str(e)}") return False -def _complete_upload(uuid: str) -> bool: - """ - 完成上传过程 - - Args: - uuid: 上传的UUID - - Returns: - 是否成功 - """ - url = f"{OSSUploadConfig.api_host}{OSSUploadConfig.complete_endpoint}" - headers = { - "Authorization": OSSUploadConfig.authorization, - "Content-Type": "application/json" - } - - payload = { - "uuid": uuid - } - - try: - response = requests.post(url, headers=headers, json=payload) - if response.status_code == 200: - result = response.json() - if result.get("code") == "10000": - return True - - print(f"完成上传失败: {response.status_code}, {response.text}") - return False - except Exception as e: - print(f"完成上传异常: {str(e)}") - return False - - -def oss_upload(file_path: str, oss_path: str, filename: Optional[str] = None, - process_key: str = "file-upload", device_id: str = "default") -> bool: +def oss_upload( + file_path: str, + filename: Optional[str] = None, + driver_name: str = "default", + exp_type: str = "default", + max_retries: int = 3, + client: Optional[HTTPClient] = None, +) -> Dict: """ 文件上传主函数,包含重试机制 Args: file_path: 本地文件路径 - oss_path: OSS目标路径 filename: 文件名,如果为None则使用file_path的文件名 - process_key: 处理键 - device_id: 设备ID + driver_name: 驱动名称,用于构造scene + exp_type: 实验类型,用于构造scene + max_retries: 最大重试次数 + client: HTTPClient实例,如果不提供则使用默认的http_client Returns: - 是否成功上传 + Dict: { + "success": bool, # 是否上传成功 + "original_path": str, # 原始文件路径 + "oss_path": str # OSS路径(成功时)或空字符串(失败时) + } """ - max_retries = OSSUploadConfig.max_retries + if filename is None: + filename = os.path.basename(file_path) + + if not os.path.exists(file_path): + logger.error(f"[OSS] 文件不存在: {file_path}") + return {"success": False, "original_path": file_path, "oss_path": ""} + retry_count = 0 + oss_path = "" while retry_count < max_retries: try: - # 步骤1:初始化上传 - init_success, init_data = _init_upload( - file_path=file_path, - oss_path=oss_path, - filename=filename, - process_key=process_key, - device_id=device_id + # 步骤1:获取预签名URL + token_success, token_data = _get_oss_token( + filename=filename, driver_name=driver_name, exp_type=exp_type, client=client ) - if not init_success: - print(f"初始化上传失败,重试 {retry_count + 1}/{max_retries}") + if not token_success: + logger.warning(f"[OSS] 获取预签名URL失败,重试 {retry_count + 1}/{max_retries}") retry_count += 1 - time.sleep(1) # 等待1秒后重试 + time.sleep(1) continue - # 获取UUID和上传URL - uuid = init_data.get("uuid") - upload_url = init_data.get("upload_url") + # 获取预签名URL和OSS路径 + upload_url = token_data.get("url") + oss_path = token_data.get("path", "") - if not uuid or not upload_url: - print(f"初始化上传返回数据不完整,重试 {retry_count + 1}/{max_retries}") + if not upload_url: + logger.warning(f"[OSS] 无法获取上传URL,API未返回url字段") retry_count += 1 time.sleep(1) continue @@ -163,69 +158,82 @@ def oss_upload(file_path: str, oss_path: str, filename: Optional[str] = None, # 步骤2:PUT上传文件 put_success = _put_upload(file_path, upload_url) if not put_success: - print(f"PUT上传失败,重试 {retry_count + 1}/{max_retries}") - retry_count += 1 - time.sleep(1) - continue - - # 步骤3:完成上传 - complete_success = _complete_upload(uuid) - if not complete_success: - print(f"完成上传失败,重试 {retry_count + 1}/{max_retries}") + logger.warning(f"[OSS] PUT上传失败,重试 {retry_count + 1}/{max_retries}") retry_count += 1 time.sleep(1) continue # 所有步骤都成功 - print(f"文件 {file_path} 上传成功") - return True + logger.info(f"[OSS] 文件 {file_path} 上传成功") + return {"success": True, "original_path": file_path, "oss_path": oss_path} except Exception as e: - print(f"上传过程异常: {str(e)},重试 {retry_count + 1}/{max_retries}") + logger.error(f"[OSS] 上传过程异常: {str(e)},重试 {retry_count + 1}/{max_retries}") retry_count += 1 time.sleep(1) - print(f"文件 {file_path} 上传失败,已达到最大重试次数 {max_retries}") - return False + logger.error(f"[OSS] 文件 {file_path} 上传失败,已达到最大重试次数 {max_retries}") + return {"success": False, "original_path": file_path, "oss_path": oss_path} if __name__ == "__main__": - # python -m unilabos.app.oss_upload -f /path/to/your/file.txt + # python -m unilabos.app.oss_upload -f /path/to/your/file.txt --driver HPLC --type test + # python -m unilabos.app.oss_upload -f /path/to/your/file.txt --driver HPLC --type test \ + # --ak xxx --sk yyy --remote-addr http://xxx/api/v1 # 命令行参数解析 - parser = argparse.ArgumentParser(description='文件上传测试工具') - parser.add_argument('--file', '-f', type=str, required=True, help='要上传的本地文件路径') - parser.add_argument('--path', '-p', type=str, default='/HPLC1/Any', help='OSS目标路径') - parser.add_argument('--device', '-d', type=str, default='test-device', help='设备ID') - parser.add_argument('--process', '-k', type=str, default='HPLC-txt-result', help='处理键') + parser = argparse.ArgumentParser(description="文件上传测试工具") + parser.add_argument("--file", "-f", type=str, required=True, help="要上传的本地文件路径") + parser.add_argument("--driver", "-d", type=str, default="default", help="驱动名称") + parser.add_argument("--type", "-t", type=str, default="default", help="实验类型") + parser.add_argument("--ak", type=str, help="Access Key,如果提供则覆盖配置") + parser.add_argument("--sk", type=str, help="Secret Key,如果提供则覆盖配置") + parser.add_argument("--remote-addr", type=str, help="远程服务器地址(包含/api/v1),如果提供则覆盖配置") args = parser.parse_args() # 检查文件是否存在 if not os.path.exists(args.file): - print(f"错误:文件 {args.file} 不存在") + logger.error(f"错误:文件 {args.file} 不存在") exit(1) - print("=" * 50) - print(f"开始上传文件: {args.file}") - print(f"目标路径: {args.path}") - print(f"设备ID: {args.device}") - print(f"处理键: {args.process}") - print("=" * 50) + # 如果提供了ak/sk/remote_addr,创建临时HTTPClient + temp_client = None + if args.ak and args.sk: + import base64 + + auth = base64.b64encode(f"{args.ak}:{args.sk}".encode("utf-8")).decode("utf-8") + remote_addr = args.remote_addr if args.remote_addr else http_client.remote_addr + temp_client = HTTPClient(remote_addr=remote_addr, auth=auth) + logger.info(f"[配置] 使用自定义配置: remote_addr={remote_addr}") + elif args.remote_addr: + temp_client = HTTPClient(remote_addr=args.remote_addr, auth=http_client.auth) + logger.info(f"[配置] 使用自定义remote_addr: {args.remote_addr}") + else: + logger.info(f"[配置] 使用默认配置: remote_addr={http_client.remote_addr}") + + logger.info("=" * 50) + logger.info(f"开始上传文件: {args.file}") + logger.info(f"驱动名称: {args.driver}") + logger.info(f"实验类型: {args.type}") + logger.info(f"Scene: {args.driver}-{args.type}") + logger.info("=" * 50) # 执行上传 - success = oss_upload( + result = oss_upload( file_path=args.file, - oss_path=args.path, filename=None, # 使用默认文件名 - process_key=args.process, - device_id=args.device + driver_name=args.driver, + exp_type=args.type, + client=temp_client, ) # 输出结果 - if success: - print("\n√ 文件上传成功!") + if result["success"]: + logger.info(f"\n√ 文件上传成功!") + logger.info(f"原始路径: {result['original_path']}") + logger.info(f"OSS路径: {result['oss_path']}") exit(0) else: - print("\n× 文件上传失败!") + logger.error(f"\n× 文件上传失败!") + logger.error(f"原始路径: {result['original_path']}") exit(1) - diff --git a/unilabos/config/config.py b/unilabos/config/config.py index b5bc6191..41f2fb5f 100644 --- a/unilabos/config/config.py +++ b/unilabos/config/config.py @@ -36,15 +36,6 @@ class WSConfig: ping_interval = 30 # ping间隔(秒) -# OSS上传配置 -class OSSUploadConfig: - api_host = "" - authorization = "" - init_endpoint = "" - complete_endpoint = "" - max_retries = 3 - - # HTTP配置 class HTTPConfig: remote_addr = "http://127.0.0.1:48197/api/v1" diff --git a/unilabos/devices/hplc/AgilentHPLC.py b/unilabos/devices/hplc/AgilentHPLC.py index d47c80d9..2e43c255 100644 --- a/unilabos/devices/hplc/AgilentHPLC.py +++ b/unilabos/devices/hplc/AgilentHPLC.py @@ -405,9 +405,19 @@ class RunningResultChecker(DriverChecker): for i in range(self.driver._finished, temp): sample_id = self.driver._get_resource_sample_id(self.driver._wf_name, i) # 从0开始计数 pdf, txt = self.driver.get_data_file(i + 1) - device_id = self.driver.device_id if hasattr(self.driver, "device_id") else "default" - oss_upload(pdf, f"hplc/{sample_id}/{os.path.basename(pdf)}", process_key="example", device_id=device_id) - oss_upload(txt, f"hplc/{sample_id}/{os.path.basename(txt)}", process_key="HPLC-txt-result", device_id=device_id) + # 使用新的OSS上传接口,传入driver_name和exp_type + pdf_result = oss_upload(pdf, filename=os.path.basename(pdf), driver_name="HPLC", exp_type="analysis") + txt_result = oss_upload(txt, filename=os.path.basename(txt), driver_name="HPLC", exp_type="result") + + if pdf_result["success"]: + print(f"PDF上传成功: {pdf_result['oss_path']}") + else: + print(f"PDF上传失败: {pdf_result['original_path']}") + + if txt_result["success"]: + print(f"TXT上传成功: {txt_result['oss_path']}") + else: + print(f"TXT上传失败: {txt_result['original_path']}") # self.driver.extract_data_from_txt() except Exception as ex: self.driver._finished = 0 @@ -456,8 +466,12 @@ if __name__ == "__main__": } sample_id = obj._get_resource_sample_id("test", 0) pdf, txt = obj.get_data_file("1", after_time=datetime(2024, 11, 6, 19, 3, 6)) - oss_upload(pdf, f"hplc/{sample_id}/{os.path.basename(pdf)}", process_key="example") - oss_upload(txt, f"hplc/{sample_id}/{os.path.basename(txt)}", process_key="HPLC-txt-result") + # 使用新的OSS上传接口,传入driver_name和exp_type + pdf_result = oss_upload(pdf, filename=os.path.basename(pdf), driver_name="HPLC", exp_type="analysis") + txt_result = oss_upload(txt, filename=os.path.basename(txt), driver_name="HPLC", exp_type="result") + + print(f"PDF上传结果: {pdf_result}") + print(f"TXT上传结果: {txt_result}") # driver = HPLCDriver() # for i in range(10000): # print({k: v for k, v in driver._device_status.items() if isinstance(v, str)})