Files
Uni-Lab-OS/unilabos/ros/nodes/presets/host_node.py
Xuwznln 7e9e93b29c Prcix9320 (#207)
* 0.10.7 Update (#101)

* Cleanup registry to be easy-understanding (#76)

* delete deprecated mock devices

* rename categories

* combine chromatographic devices

* rename rviz simulation nodes

* organic virtual devices

* parse vessel_id

* run registry completion before merge

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>

* fix: workstation handlers and vessel_id parsing

* fix: working dir error when input config path
feat: report publish topic when error

* modify default discovery_interval to 15s

* feat: add trace log level

* feat: 添加ChinWe设备控制类,支持串口通信和电机控制功能 (#79)

* fix: drop_tips not using auto resource select

* fix: discard_tips error

* fix: discard_tips

* fix: prcxi_res

* add: prcxi res
fix: startup slow

* feat: workstation example

* fix pumps and liquid_handler handle

* feat: 优化protocol node节点运行日志

* fix all protocol_compilers and remove deprecated devices

* feat: 新增use_remote_resource参数

* fix and remove redundant info

* bugfixes on organic protocols

* fix filter protocol

* fix protocol node

* 临时兼容错误的driver写法

* fix: prcxi import error

* use call_async in all service to avoid deadlock

* fix: figure_resource

* Update recipe.yaml

* add workstation template and battery example

* feat: add sk & ak

* update workstation base

* Create workstation_architecture.md

* refactor: workstation_base 重构为仅含业务逻辑,通信和子设备管理交给 ProtocolNode

* refactor: ProtocolNode→WorkstationNode

* Add:msgs.action (#83)

* update: Workstation dev 将版本号从 0.10.3 更新为 0.10.4 (#84)

* Add:msgs.action

* update: 将版本号从 0.10.3 更新为 0.10.4

* simplify resource system

* uncompleted refactor

* example for use WorkstationBase

* feat: websocket

* feat: websocket test

* feat: workstation example

* feat: action status

* fix: station自己的方法注册错误

* fix: 还原protocol node处理方法

* fix: build

* fix: missing job_id key

* ws test version 1

* ws test version 2

* ws protocol

* 增加物料关系上传日志

* 增加物料关系上传日志

* 修正物料关系上传

* 修复工站的tracker实例追踪失效问题

* 增加handle检测,增加material edge关系上传

* 修复event loop错误

* 修复edge上报错误

* 修复async错误

* 更新schema的title字段

* 主机节点信息等支持自动刷新

* 注册表编辑器

* 修复status密集发送时,消息出错

* 增加addr参数

* fix: addr param

* fix: addr param

* 取消labid 和 强制config输入

* Add action definitions for LiquidHandlerSetGroup and LiquidHandlerTransferGroup

- Created LiquidHandlerSetGroup.action with fields for group name, wells, and volumes.
- Created LiquidHandlerTransferGroup.action with fields for source and target group names and unit volume.
- Both actions include response fields for return information and success status.

* Add LiquidHandlerSetGroup and LiquidHandlerTransferGroup actions to CMakeLists

* Add set_group and transfer_group methods to PRCXI9300Handler and update liquid_handler.yaml

* result_info改为字典类型

* 新增uat的地址替换

* runze multiple pump support

(cherry picked from commit 49354fcf39)

* remove runze multiple software obtainer

(cherry picked from commit 8bcc92a394)

* support multiple backbone

(cherry picked from commit 4771ff2347)

* Update runze pump format

* Correct runze multiple backbone

* Update runze_multiple_backbone

* Correct runze pump multiple receive method.

* Correct runze pump multiple receive method.

* 对于PRCXI9320的transfer_group,一对多和多对多

* 移除MQTT,更新launch文档,提供注册表示例文件,更新到0.10.5

* fix import error

* fix dupe upload registry

* refactor ws client

* add server timeout

* Fix: run-column with correct vessel id (#86)

* fix run_column

* Update run_column_protocol.py

(cherry picked from commit e5aa4d940a)

* resource_update use resource_add

* 新增版位推荐功能

* 重新规定了版位推荐的入参

* update registry with nested obj

* fix protocol node log_message, added create_resource return value

* fix protocol node log_message, added create_resource return value

* try fix add protocol

* fix resource_add

* 修复移液站错误的aspirate注册表

* Feature/xprbalance-zhida (#80)

* feat(devices): add Zhida GC/MS pretreatment automation workstation

* feat(devices): add mettler_toledo xpr balance

* balance

* 重新补全zhida注册表

* PRCXI9320 json

* PRCXI9320 json

* PRCXI9320 json

* fix resource download

* remove class for resource

* bump version to 0.10.6

* 更新所有注册表

* 修复protocolnode的兼容性

* 修复protocolnode的兼容性

* Update install md

* Add Defaultlayout

* 更新物料接口

* fix dict to tree/nested-dict converter

* coin_cell_station draft

* refactor: rename "station_resource" to "deck"

* add standardized BIOYOND resources: bottle_carrier, bottle

* refactor and add BIOYOND resources tests

* add BIOYOND deck assignment and pass all tests

* fix: update resource with correct structure; remove deprecated liquid_handler set_group action

* feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92)

* feat: 新威电池测试系统驱动与注册文件

* feat: bring neware driver & battery.json into workstation_dev_YB2

* add bioyond studio draft

* bioyond station with communication init and resource sync

* fix bioyond station and registry

* fix: update resource with correct structure; remove deprecated liquid_handler set_group action

* frontend_docs

* create/update resources with POST/PUT for big amount/ small amount data

* create/update resources with POST/PUT for big amount/ small amount data

* refactor: add itemized_carrier instead of carrier consists of ResourceHolder

* create warehouse by factory func

* update bioyond launch json

* add child_size for itemized_carrier

* fix bioyond resource io

* Workstation templates: Resources and its CRUD, and workstation tasks (#95)

* coin_cell_station draft

* refactor: rename "station_resource" to "deck"

* add standardized BIOYOND resources: bottle_carrier, bottle

* refactor and add BIOYOND resources tests

* add BIOYOND deck assignment and pass all tests

* fix: update resource with correct structure; remove deprecated liquid_handler set_group action

* feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92)

* feat: 新威电池测试系统驱动与注册文件

* feat: bring neware driver & battery.json into workstation_dev_YB2

* add bioyond studio draft

* bioyond station with communication init and resource sync

* fix bioyond station and registry

* create/update resources with POST/PUT for big amount/ small amount data

* refactor: add itemized_carrier instead of carrier consists of ResourceHolder

* create warehouse by factory func

* update bioyond launch json

* add child_size for itemized_carrier

* fix bioyond resource io

---------

Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com>
Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>

* 更新物料接口

* Workstation dev yb2 (#100)

* Refactor and extend reaction station action messages

* Refactor dispensing station tasks to enhance parameter clarity and add batch processing capabilities

- Updated `create_90_10_vial_feeding_task` to include detailed parameters for 90%/10% vial feeding, improving clarity and usability.
- Introduced `create_batch_90_10_vial_feeding_task` for batch processing of 90%/10% vial feeding tasks with JSON formatted input.
- Added `create_batch_diamine_solution_task` for batch preparation of diamine solution, also utilizing JSON formatted input.
- Refined `create_diamine_solution_task` to include additional parameters for better task configuration.
- Enhanced schema descriptions and default values for improved user guidance.

* 修复to_plr_resources

* add update remove

* 支持选择器注册表自动生成
支持转运物料

* 修复资源添加

* 修复transfer_resource_to_another生成

* 更新transfer_resource_to_another参数,支持spot入参

* 新增test_resource动作

* fix host_node error

* fix host_node test_resource error

* fix host_node test_resource error

* 过滤本地动作

* 移动内部action以兼容host node

* 修复同步任务报错不显示的bug

* feat: 允许返回非本节点物料,后面可以通过decoration进行区分,就不进行warning了

* update todo

* modify bioyond/plr converter, bioyond resource registry, and tests

* pass the tests

* update todo

* add conda-pack-build.yml

* add auto install script for conda-pack-build.yml

(cherry picked from commit 172599adcf)

* update conda-pack-build.yml

* update conda-pack-build.yml

* update conda-pack-build.yml

* update conda-pack-build.yml

* update conda-pack-build.yml

* Add version in __init__.py
Update conda-pack-build.yml
Add create_zip_archive.py

* Update conda-pack-build.yml

* Update conda-pack-build.yml (with mamba)

* Update conda-pack-build.yml

* Fix FileNotFoundError

* Try fix 'charmap' codec can't encode characters in position 16-23: character maps to <undefined>

* Fix unilabos msgs search error

* Fix environment_check.py

* Update recipe.yaml

* Update registry. Update uuid loop figure method. Update install docs.

* Fix nested conda pack

* Fix one-key installation path error

* Bump version to 0.10.7

* Workshop bj (#99)

* Add LaiYu Liquid device integration and tests

Introduce LaiYu Liquid device implementation, including backend, controllers, drivers, configuration, and resource files. Add hardware connection, tip pickup, and simplified test scripts, as well as experiment and registry configuration for LaiYu Liquid. Documentation and .gitignore for the device are also included.

* feat(LaiYu_Liquid): 重构设备模块结构并添加硬件文档

refactor: 重新组织LaiYu_Liquid模块目录结构
docs: 添加SOPA移液器和步进电机控制指令文档
fix: 修正设备配置中的最大体积默认值
test: 新增工作台配置测试用例
chore: 删除过时的测试脚本和配置文件

* add

* 重构: 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py 并更新所有导入引用

- 使用 git mv 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py
- 更新所有相关文件中的导入引用
- 保持代码功能不变,仅改善命名一致性
- 测试确认所有导入正常工作

* 修复: 在 core/__init__.py 中添加 LaiYuLiquidBackend 导出

- 添加 LaiYuLiquidBackend 到导入列表
- 添加 LaiYuLiquidBackend 到 __all__ 导出列表
- 确保所有主要类都可以正确导入

* 修复大小写文件夹名字

* 电池装配工站二次开发教程(带目录)上传至dev (#94)

* 电池装配工站二次开发教程

* Update intro.md

* 物料教程

* 更新物料教程,json格式注释

* Update prcxi driver & fix transfer_liquid mix_times (#90)

* Update prcxi driver & fix transfer_liquid mix_times

* fix: correct mix_times type

* Update liquid_handler registry

* test: prcxi.py

* Update registry from pr

* fix ony-key script not exist

* clean files

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>
Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
Co-authored-by: Guangxin Zhang <guangxin.zhang.bio@gmail.com>
Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>
Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com>
Co-authored-by: LccLink <1951855008@qq.com>
Co-authored-by: lixinyu1011 <61094742+lixinyu1011@users.noreply.github.com>
Co-authored-by: shiyubo0410 <shiyubo@dp.tech>

* fix startup env check.
add auto install during one-key installation

* Try fix one-key build on linux

* Complete all one key installation

* fix: rename schema field to resource_schema with serialization and validation aliases (#104)

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>

* Fix one-key installation build

Install conda-pack before pack command

Add conda-pack to base when building one-key installer

Fix param error when using mamba run

Try fix one-key build on linux

* Fix conda pack on windows

* add plr_to_bioyond, and refactor bioyond stations

* modify default config

* Fix one-key installation build for windows

* Fix workstation startup
Update registry

* Fix/resource UUID and doc fix (#109)

* Fix ResourceTreeSet load error

* Raise error when using unsupported type to create ResourceTreeSet

* Fix children key error

* Fix children key error

* Fix workstation resource not tracking

* Fix workstation deck & children resource dupe

* Fix workstation deck & children resource dupe

* Fix multiple resource error

* Fix resource tree update

* Fix resource tree update

* Force confirm uuid

* Tip more error log

* Refactor Bioyond workstation and experiment workflow (#105)

Refactored the Bioyond workstation classes to improve parameter handling and workflow management. Updated experiment.py to use BioyondReactionStation with deck and material mappings, and enhanced workflow step parameter mapping and execution logic. Adjusted JSON experiment configs, improved workflow sequence handling, and added UUID assignment to PLR materials. Removed unused station_config and material cache logic, and added detailed docstrings and debug output for workflow methods.

* Fix resource get.
Fix resource parent not found.
Mapping uuid for all resources.

* mount parent uuid

* Add logging configuration based on BasicConfig in main function

* fix workstation node error

* fix workstation node error

* Update boot example

* temp fix for resource get

* temp fix for resource get

* provide error info when cant find plr type

* pack repo info

* fix to plr type error

* fix to plr type error

* Update regular container method

* support no size init

* fix comprehensive_station.json

* fix comprehensive_station.json

* fix type conversion

* fix state loading for regular container

* Update deploy-docs.yml

* Update deploy-docs.yml

---------

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>

* Close #107
Update doc url.

* Fix/update resource (#112)

* cancel upload_registry

* Refactor Bioyond workstation and experiment workflow -fix (#111)

* refactor(bioyond_studio): 优化材料缓存加载和参数验证逻辑

改进材料缓存加载逻辑以支持多种材料类型和详细材料处理
更新工作流参数验证中的字段名从key/value改为Key/DisplayValue
移除未使用的merge_workflow_with_parameters方法
添加get_station_info方法获取工作站基础信息
清理实验文件中的注释代码和更新导入路径

* fix: 修复资源移除时的父资源检查问题

在BaseROS2DeviceNode中,移除资源前添加对父资源是否为None的检查,避免空指针异常
同时更新Bottle和BottleCarrier类以支持**kwargs参数
修正测试文件中Liquid_feeding_beaker的大小写拼写错误

* correct return message

---------

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>

* fix resource_get in action

* fix(reaction_station): 清空工作流序列和参数避免重复执行 (#113)

在创建任务后清空工作流序列和参数,防止下次执行时累积重复

* Update create_resource device_id

* Update ResourceTracker

add more enumeration in POSE

fix converter in resource_tracker

* Update graphio together with workstation design.

fix(reaction_station): 为步骤参数添加Value字段传个BY后端

fix(bioyond/warehouses): 修正仓库尺寸和物品排列参数

调整仓库的x轴和z轴物品数量以及物品尺寸参数,使其符合4x1x4的规格要求

fix warehouse serialize/deserialize

fix bioyond converter

fix itemized_carrier.unassign_child_resource

allow not-loaded MSG in registry

add layout serializer & converter

warehouseuse A1-D4; add warehouse layout

fix(graphio): 修正bioyond到plr资源转换中的坐标计算错误

Fix resource assignment and type mapping issues

Corrects resource assignment in ItemizedCarrier by using the correct spot key from _ordering. Updates graphio to use 'typeName' instead of 'name' for type mapping in resource_bioyond_to_plr. Renames DummyWorkstation to BioyondWorkstation in workstation_http_service for clarity.

* Update workstation & bioyond example

Refine descriptions in Bioyond reaction station YAML

Updated and clarified field and operation descriptions in the reaction_station_bioyond.yaml file for improved accuracy and consistency. Changes include more precise terminology, clearer parameter explanations, and standardized formatting for operation schemas.

refactor(workstation): 更新反应站参数描述并添加分液站配置文件

修正反应站方法参数描述,使其更准确清晰
添加bioyond_dispensing_station.yaml配置文件

add create_workflow script and test

add invisible_slots to carriers

fix(warehouses): 修正bioyond_warehouse_1x4x4仓库的尺寸参数

调整仓库的num_items_x和num_items_z值以匹配实际布局,并更新物品尺寸参数

save resource get data. allow empty value for layout and cross_section_type

More decks&plates support for bioyond (#115)

refactor(registry): 重构反应站设备配置,简化并更新操作命令

移除旧的自动操作命令,新增针对具体化学操作的命令配置
更新模块路径和配置结构,优化参数定义和描述

fix(dispensing_station): 修正物料信息查询方法调用

将直接调用material_id_query改为通过hardware_interface调用,以符合接口设计规范

* PRCXI Update

修改prcxi连线

prcxi样例图

Create example_prcxi.json

* Update resource extra & uuid.

use ordering to convert identifier to idx

convert identifier to site idx

correct extra key

update extra before transfer

fix multiple instance error

add resource_tree_transfer func

fox itemrized carrier assign child resource

support internal device material transfer

remove extra key

use same callback group

support material extra

support material extra
support update_resource_site in extra

* Update workstation.

modify workstation_architecture docs

bioyond_HR (#133)

* feat: Enhance Bioyond synchronization and resource management

- Implemented synchronization for all material types (consumables, samples, reagents) from Bioyond, logging detailed information for each type.
- Improved error handling and logging during synchronization processes.
- Added functionality to save Bioyond material IDs in UniLab resources for future updates.
- Enhanced the `sync_to_external` method to handle material movements correctly, including querying and creating materials in Bioyond.
- Updated warehouse configurations to support new storage types and improved layout for better resource management.
- Introduced new resource types such as reactors and tip boxes, with detailed specifications.
- Modified warehouse factory to support column offsets for naming conventions (e.g., A05-D08).
- Improved resource tracking by merging extra attributes instead of overwriting them.
- Added a new method for updating resources in Bioyond, ensuring better synchronization of resource changes.

* feat: 添加TipBox和Reactor的配置到bottles.yaml

* fix: 修复液体投料方法中的volume参数处理逻辑

修复solid_feeding_vials方法中的volume参数处理逻辑,优化solvents参数的使用条件

更新液体投料方法,支持通过溶剂信息自动计算体积,添加solvents参数并更新文档描述

Add batch creation methods for vial and solution tasks

添加批量创建90%10%小瓶投料任务和二胺溶液配置任务的功能,更新相关参数和默认值

* 封膜仪、撕膜仪、耗材站接口

* 添加Raman和xrd相关代码

* Resource update & asyncio fix

correct bioyond config

prcxi example

fix append_resource

fix regularcontainer

fix cancel error

fix resource_get param

fix json dumps

support name change during materials change

enable slave mode

change uuid logger to trace level

correct remove_resource stats

disable slave connect websocket

adjust with_children param

modify devices to use correct executor (sleep, create_task)

support sleep and create_task in node

fix run async execution error

* bump version to 0.10.9

update registry

* PRCXI Reset Error Correction (#166)

* change 9320 desk row number to 4

* Updated 9320 host address

* Updated 9320 host address

* Add **kwargs in classes: PRCXI9300Deck and PRCXI9300Container

* Removed all sample_id in prcxi_9320.json to avoid KeyError

* 9320 machine testing settings

* Typo

* Rewrite setup logic to clear error code

* 初始化 step_mode 属性

* 1114物料手册定义教程byxinyu (#165)

* 宜宾奔耀工站deck前端by_Xinyu

* 构建物料教程byxinyu

* 1114物料手册定义教程

* 3d sim (#97)

* 修改lh的json启动

* 修改lh的json启动

* 修改backend,做成sim的通用backend

* 修改yaml的地址,3D模型适配网页生产环境

* 添加laiyu硬件连接

* 修改移液枪的状态判断方法,

修改移液枪的状态判断方法,
添加三轴的表定点与零点之间的转换
添加三轴真实移动的backend

* 修改laiyu移液站

简化移动方法,
取消软件限制位置,
修改当值使用Z轴时也需要重新复位Z轴的问题

* 更新lh以及laiyu workshop

1,现在可以直接通过修改backend,适配其他的移液站,主类依旧使用LiquidHandler,不用重新编写

2,修改枪头判断标准,使用枪头自身判断而不是类的判断,

3,将归零参数用毫米计算,方便手动调整,

4,修改归零方式,上电使用机械归零,确定机械零点,手动归零设置工作区域零点方便计算,二者互不干涉

* 修改枪头动作

* 修改虚拟仿真方法

---------

Co-authored-by: zhangshixiang <@zhangshixiang>
Co-authored-by: Junhan Chang <changjh@dp.tech>

* 标准化opcua设备接入unilab (#78)

* 初始提交,只保留工作区当前状态

* remove redundant arm_slider meshes

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>

* add new laiyu liquid driver, yaml and json files (#164)

* HR物料同步,前端展示位置修复 (#135)

* 更新Bioyond工作站配置,添加新的物料类型映射和载架定义,优化物料查询逻辑

* 添加Bioyond实验配置文件,定义物料类型映射和设备配置

* 更新bioyond_warehouse_reagent_stack方法,修正试剂堆栈尺寸和布局描述

* 更新Bioyond实验配置,修正物料类型映射,优化设备配置

* 更新Bioyond资源同步逻辑,优化物料入库流程,增强错误处理和日志记录

* 更新Bioyond资源,添加配液站和反应站专用载架,优化仓库工厂函数的排序方式

* 更新Bioyond资源,添加配液站和反应站相关载架,优化试剂瓶和样品瓶配置

* 更新Bioyond实验配置,修正试剂瓶载架ID,确保与设备匹配

* 更新Bioyond资源,移除反应站单烧杯载架,添加反应站单烧瓶载架分类

* Refactor Bioyond resource synchronization and update bottle carrier definitions

- Removed traceback printing in error handling for Bioyond synchronization.
- Enhanced logging for existing Bioyond material ID usage during synchronization.
- Added new bottle carrier definitions for single flask and updated existing ones.
- Refactored dispensing station and reaction station bottle definitions for clarity and consistency.
- Improved resource mapping and error handling in graphio for Bioyond resource conversion.
- Introduced layout parameter in warehouse factory for better warehouse configuration.

* 更新Bioyond仓库工厂,添加排序方式支持,优化坐标计算逻辑

* 更新Bioyond载架和甲板配置,调整样品板尺寸和仓库坐标

* 更新Bioyond资源同步,增强占用位置日志信息,修正坐标转换逻辑

* 更新Bioyond反应站和分配站配置,调整材料类型映射和ID,移除不必要的项

* support name change during materials change

* fix json dumps

* correct tip

* 优化调度器API路径,更新相关方法描述

* 更新 BIOYOND 载架相关文档,调整 API 以支持自带试剂瓶的载架类型,修复资源获取时的子物料处理逻辑

* 实现资源删除时的同步处理,优化出库操作逻辑

* 修复 ItemizedCarrier 中的可见性逻辑

* 保存 Bioyond 原始信息到 unilabos_extra,以便出库时查询

* 根据 resource.capacity 判断是试剂瓶(载架)还是多瓶载架,走不同的奔曜转换

* Fix bioyond bottle_carriers ordering

* 优化 Bioyond 物料同步逻辑,增强坐标解析和位置更新处理

* disable slave connect websocket

* correct remove_resource stats

* change uuid logger to trace level

* enable slave mode

* refactor(bioyond): 统一资源命名并优化物料同步逻辑

- 将DispensingStation和ReactionStation资源统一为PolymerStation命名
- 优化物料同步逻辑,支持耗材类型(typeMode=0)的查询
- 添加物料默认参数配置功能
- 调整仓库坐标布局
- 清理废弃资源定义

* feat(warehouses): 为仓库函数添加col_offset和layout参数

* refactor: 更新实验配置中的物料类型映射命名

将DispensingStation和ReactionStation的物料类型映射统一更名为PolymerStation,保持命名一致性

* fix: 更新实验配置中的载体名称从6VialCarrier到6StockCarrier

* feat(bioyond): 实现物料创建与入库分离逻辑

将物料同步流程拆分为两个独立阶段:transfer阶段只创建物料,add阶段执行入库
简化状态检查接口,仅返回连接状态

* fix(reaction_station): 修正液体进料烧杯体积单位并增强返回结果

将液体进料烧杯的体积单位从μL改为g以匹配实际使用场景
在返回结果中添加merged_workflow和order_params字段,提供更完整的工作流信息

* feat(dispensing_station): 在任务创建返回结果中添加order_params信息

在create_order方法返回结果中增加order_params字段,以便调用方获取完整的任务参数

* fix(dispensing_station): 修改90%物料分配逻辑从分成3份改为直接使用

原逻辑将主称固体平均分成3份作为90%物料,现改为直接使用main_portion

* feat(bioyond): 添加任务编码和任务ID的输出,支持批量任务创建后的状态监控

* refactor(registry): 简化设备配置中的任务结果处理逻辑

将多个单独的任务编码和ID字段合并为统一的return_info字段
更新相关描述以反映新的数据结构

* feat(工作站): 添加HTTP报送服务和任务完成状态跟踪

- 在graphio.py中添加API必需字段
- 实现工作站HTTP服务启动和停止逻辑
- 添加任务完成状态跟踪字典和等待方法
- 重写任务完成报送处理方法记录状态
- 支持批量任务完成等待和报告获取

* refactor(dispensing_station): 移除wait_for_order_completion_and_get_report功能

该功能已被wait_for_multiple_orders_and_get_reports替代,简化代码结构

* fix: 更新任务报告API错误

* fix(workstation_http_service): 修复状态查询中device_id获取逻辑

处理状态查询时安全获取device_id,避免因属性不存在导致的异常

* fix(bioyond_studio): 改进物料入库失败时的错误处理和日志记录

在物料入库API调用失败时,添加更详细的错误信息打印
同时修正station.py中对空响应和失败情况的判断逻辑

* refactor(bioyond): 优化瓶架载体的分配逻辑和注释说明

重构瓶架载体的分配逻辑,使用嵌套循环替代硬编码索引分配
添加更详细的坐标映射说明,明确PLR与Bioyond坐标的对应关系

* fix(bioyond_rpc): 修复物料入库成功时无data字段返回空的问题

当API返回成功但无data字段时,返回包含success标识的字典而非空字典

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>
Co-authored-by: Junhan Chang <changjh@dp.tech>

* nmr

* Update devices

* bump version to 0.10.10

* Update repo files.

* Add get_resource_with_dir & get_resource method

* fix camera & workstation & warehouse & reaction station driver

* update docs, test examples
fix liquid_handler init bug

* bump version to 0.10.11

* Add startup_json_path, disable_browser, port config

* Update oss config

* feat(bioyond_studio): 添加项目API接口支持及优化物料管理功能

添加通用项目API接口方法(_post_project_api, _delete_project_api)用于与LIMS系统交互
实现compute_experiment_design方法用于实验设计计算
新增brief_step_parameters等订单相关接口方法
优化物料转移逻辑,增加异步任务处理
扩展BioyondV1RPC类,添加批量物料操作、订单状态管理等功能

* feat(bioyond): 添加测量小瓶仓库和更新仓库工厂函数参数

* Support unilabos_samples key

* add session_id and normal_exit

* Add result schema and add TypedDict conversion.

* Fix port error

* Add backend api and update doc

* Add get_regular_container func

* Add get_regular_container func

* Transfer_liquid (#176)

* change 9320 desk row number to 4

* Updated 9320 host address

* Updated 9320 host address

* Add **kwargs in classes: PRCXI9300Deck and PRCXI9300Container

* Removed all sample_id in prcxi_9320.json to avoid KeyError

* 9320 machine testing settings

* Typo

* Typo in base_device_node.py

* Enhance liquid handling functionality by adding support for multiple transfer modes (one-to-many, one-to-one, many-to-one) and improving parameter validation. Default channel usage is set when not specified. Adjusted mixing logic to ensure it only occurs when valid conditions are met. Updated documentation for clarity.

* Auto dump logs, fix workstation input schema

* Fix startup with remote resource error

Resource dict fully change to "pose" key

Update oss link

Reduce pylabrobot conversion warning & force enable log dump.

更新 logo 图片

* signal when host node is ready

* fix ros2 future

print all logs to file
fix resource dict dump error

* update version to 0.10.12

* 修改sample_uuid的返回值

* 修改pose标签设定机制

* 添加 aspiate函数返回值

* 返回dispense后的sample_uuid

* 添加self.pending_liquids_dict的重置方法

* 修改prcxi的json文件,解决trach错误问题

* 修改prcxijson,防止PlateT4的硬件错误

* 对laiyu移液站进行部分修改,取消多次初始化的问题

* 修改根据新的物料格式,修改可视化

* 添加切换枪头方法,添加mock振荡与加热方法

* 夹爪添加

* 删除多余的laiyu部分

* 云端可启动夹爪

* Delete __init__.py

* Enhance PRCXI9300 classes with new Container and TipRack implementations, improving state management and initialization logic. Update JSON configuration to reflect type changes for containers and plates.

* 修改上传数据

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>
Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
Co-authored-by: Guangxin Zhang <guangxin.zhang.bio@gmail.com>
Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>
Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com>
Co-authored-by: LccLink <1951855008@qq.com>
Co-authored-by: lixinyu1011 <61094742+lixinyu1011@users.noreply.github.com>
Co-authored-by: shiyubo0410 <shiyubo@dp.tech>
Co-authored-by: hh.(SII) <103566763+Mile-Away@users.noreply.github.com>
Co-authored-by: Xianwei Qi <qxw@stu.pku.edu.cn>
Co-authored-by: WenzheG <wenzheguo32@gmail.com>
Co-authored-by: Harry Liu <113173203+ALITTLELZ@users.noreply.github.com>
Co-authored-by: q434343 <73513873+q434343@users.noreply.github.com>
Co-authored-by: tt <166512503+tt11142023@users.noreply.github.com>
Co-authored-by: xyc <49015816+xiaoyu10031@users.noreply.github.com>
Co-authored-by: zhangshixiang <@zhangshixiang>
Co-authored-by: zhangshixiang <554662886@qq.com>
Co-authored-by: ALITTLELZ <l_LZlz@163.com>
2025-12-26 02:28:56 +08:00

1483 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import collections
from dataclasses import dataclass, field
import json
import threading
import time
import traceback
import uuid
from typing import TYPE_CHECKING, Optional, Dict, Any, List, ClassVar, Set, TypedDict, Union
from action_msgs.msg import GoalStatus
from geometry_msgs.msg import Point
from rclpy.action import ActionClient, get_action_server_names_and_types_by_node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.service import Service
from unilabos_msgs.msg import Resource # type: ignore
from unilabos_msgs.srv import (
ResourceAdd,
ResourceDelete,
ResourceUpdate,
ResourceList,
SerialCommand,
ResourceGet,
) # type: ignore
from unilabos_msgs.srv._serial_command import SerialCommand_Request, SerialCommand_Response
from unique_identifier_msgs.msg import UUID
from unilabos.registry.registry import lab_registry
from unilabos.resources.graphio import initialize_resource
from unilabos.resources.registry import add_schema
from unilabos.ros.initialize_device import initialize_device_from_dict
from unilabos.ros.msgs.message_converter import (
get_msg_type,
get_ros_type_by_msgname,
convert_from_ros_msg,
convert_to_ros_msg,
msg_converter_manager,
)
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode, DeviceNodeResourceTracker
from unilabos.ros.nodes.presets.controller_node import ControllerNode
from unilabos.ros.nodes.resource_tracker import (
ResourceDict,
ResourceDictInstance,
ResourceTreeSet,
ResourceTreeInstance,
)
from unilabos.utils import logger
from unilabos.utils.exception import DeviceClassInvalid
from unilabos.utils.type_check import serialize_result_info
from unilabos.registry.placeholder_type import ResourceSlot, DeviceSlot
if TYPE_CHECKING:
from unilabos.app.ws_client import QueueItem
@dataclass
class DeviceActionStatus:
job_ids: Dict[str, float] = field(default_factory=dict)
class TestResourceReturn(TypedDict):
resources: List[List[ResourceDict]]
devices: List[DeviceSlot]
class HostNode(BaseROS2DeviceNode):
"""
主机节点类,负责管理设备、资源和控制器
作为单例模式实现,确保整个应用中只有一个主机节点实例
"""
_instance: ClassVar[Optional["HostNode"]] = None
_ready_event: ClassVar[threading.Event] = threading.Event()
_device_action_status: ClassVar[collections.defaultdict[str, DeviceActionStatus]] = collections.defaultdict(
DeviceActionStatus
)
_resource_tracker: ClassVar[DeviceNodeResourceTracker] = DeviceNodeResourceTracker() # 资源管理器实例
@classmethod
def get_instance(cls, timeout=None) -> Optional["HostNode"]:
if cls._ready_event.wait(timeout):
return cls._instance
return None
def __init__(
self,
device_id: str,
devices_config: ResourceTreeSet,
resources_config: ResourceTreeSet,
resources_edge_config: list[dict],
physical_setup_graph: Optional[Dict[str, Any]] = None,
controllers_config: Optional[Dict[str, Any]] = None,
bridges: Optional[List[Any]] = None,
discovery_interval: float = 180.0, # 设备发现间隔,单位为秒
):
"""
初始化主机节点
Args:
device_id: 节点名称
devices_config: 设备配置
resources_config: 资源配置
physical_setup_graph: 物理设置图
controllers_config: 控制器配置
bridges: 桥接器列表
discovery_interval: 设备发现间隔默认5秒
"""
if self._instance is not None:
self._instance.lab_logger().critical("[Host Node] HostNode instance already exists.")
# 设置单例实例
self.__class__._instance = self
# 初始化配置
self.server_latest_timestamp = 0.0 #
self.devices_config = devices_config
self.resources_config = resources_config # 直接保存 ResourceTreeSet
self.resources_edge_config = resources_edge_config
self.physical_setup_graph = physical_setup_graph
if controllers_config is None:
controllers_config = {}
self.controllers_config = controllers_config
if bridges is None:
bridges = []
self.bridges = bridges
# 创建 host_node 作为一个单独的 ResourceTree
host_node_dict = {
"id": "host_node",
"uuid": str(uuid.uuid4()),
"parent_uuid": "",
"name": "host_node",
"type": "device",
"class": "host_node",
"config": {},
"data": {},
"children": [],
"description": "",
"schema": {},
"model": {},
"icon": "",
}
# 创建 host_node 的 ResourceTree
host_node_instance = ResourceDictInstance.get_resource_instance_from_dict(host_node_dict)
host_node_tree = ResourceTreeInstance(host_node_instance)
resources_config.trees.insert(0, host_node_tree)
try:
for bridge in self.bridges:
if hasattr(bridge, "resource_tree_add") and resources_config:
from unilabos.app.web.client import HTTPClient
client: HTTPClient = bridge
resource_start_time = time.time()
# 传递 ResourceTreeSet 对象,在 client 中转换为字典并获取 UUID 映射
uuid_mapping = client.resource_tree_add(resources_config, "", True)
device_uuid = resources_config.root_nodes[0].res_content.uuid
resource_end_time = time.time()
logger.info(
f"[Host Node-Resource] 物料上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
)
for edge in self.resources_edge_config:
edge["source_uuid"] = uuid_mapping.get(edge["source_uuid"], edge["source_uuid"])
edge["target_uuid"] = uuid_mapping.get(edge["target_uuid"], edge["target_uuid"])
resource_add_res = client.resource_edge_add(self.resources_edge_config)
resource_edge_end_time = time.time()
logger.info(
f"[Host Node-Resource] 物料关系上传 {round(resource_edge_end_time - resource_end_time, 5) * 1000} ms"
)
# resources_config 通过各个设备的 resource_tracker 进行uuid更新利用uuid_mapping
# resources_config 的 root node 是
# # 创建反向映射new_uuid -> old_uuid
# reverse_uuid_mapping = {new_uuid: old_uuid for old_uuid, new_uuid in uuid_mapping.items()}
for tree in resources_config.trees:
node = tree.root_node
if node.res_content.type == "device":
continue
else:
try:
for plr_resource in ResourceTreeSet([tree]).to_plr_resources():
self._resource_tracker.add_resource(plr_resource)
except Exception as ex:
self.lab_logger().warning(f"[Host Node-Resource] 根节点物料{tree}序列化失败!")
except Exception as ex:
logger.error(f"[Host Node-Resource] 添加物料出错!\n{traceback.format_exc()}")
# 初始化Node基类传递空参数覆盖列表
BaseROS2DeviceNode.__init__(
self,
driver_instance=self,
device_id=device_id,
device_uuid=host_node_dict["uuid"],
status_types={},
action_value_mappings=lab_registry.device_type_registry["host_node"]["class"]["action_value_mappings"],
hardware_interface={},
print_publish=False,
resource_tracker=self._resource_tracker, # host node并不是通过initialize 包一层传进来的
)
# 创建设备、动作客户端和目标存储
self.devices_names: Dict[str, str] = {device_id: self.namespace} # 存储设备名称和命名空间的映射
self.devices_instances: Dict[str, ROS2DeviceNode] = {} # 存储设备实例
self.device_machine_names: Dict[str, str] = {
device_id: "本地",
} # 存储设备ID到机器名称的映射
self._action_clients: Dict[str, ActionClient] = { # 为了方便了解实际的数据类型host的默认写好
"/devices/host_node/create_resource": ActionClient(
self,
lab_registry.ResourceCreateFromOuterEasy,
"/devices/host_node/create_resource",
callback_group=self.callback_group,
),
"/devices/host_node/create_resource_detailed": ActionClient(
self,
lab_registry.ResourceCreateFromOuter,
"/devices/host_node/create_resource_detailed",
callback_group=self.callback_group,
),
"/devices/host_node/test_latency": ActionClient(
self,
lab_registry.EmptyIn,
"/devices/host_node/test_latency",
callback_group=self.callback_group,
),
"/devices/host_node/test_resource": ActionClient(
self,
lab_registry.EmptyIn,
"/devices/host_node/test_resource",
callback_group=self.callback_group,
),
"/devices/host_node/_execute_driver_command": ActionClient(
self,
lab_registry.StrSingleInput,
"/devices/host_node/_execute_driver_command",
callback_group=self.callback_group,
),
"/devices/host_node/_execute_driver_command_async": ActionClient(
self,
lab_registry.StrSingleInput,
"/devices/host_node/_execute_driver_command_async",
callback_group=self.callback_group,
),
} # 用来存储多个ActionClient实例
self._action_value_mappings: Dict[str, Dict] = (
{}
) # 用来存储多个ActionClient的type, goal, feedback, result的变量名映射关系
self._goals: Dict[str, Any] = {} # 用来存储多个目标的状态
self._online_devices: Set[str] = {f"{self.namespace}/{device_id}"} # 用于跟踪在线设备
self._last_discovery_time = 0.0 # 上次设备发现的时间
self._discovery_lock = threading.Lock() # 设备发现的互斥锁
self._subscribed_topics = set() # 用于跟踪已订阅的话题
# 创建物料增删改查服务(非客户端)
self._init_host_service()
self.device_status = {} # 用来存储设备状态
self.device_status_timestamps = {} # 用来存储设备状态最后更新时间
time.sleep(1) # 等待通信连接稳定
# 首次发现网络中的设备
self._discover_devices()
# 初始化所有本机设备节点,多一次过滤,防止重复初始化
for device_config in devices_config.root_nodes:
device_id = device_config.res_content.id
if device_config.res_content.type != "device":
continue
if device_id not in self.devices_names:
self.initialize_device(device_id, device_config)
else:
self.lab_logger().warning(f"[Host Node] Device {device_id} already existed, skipping.")
self.update_device_status_subscriptions()
# TODO: 需要验证 初始化所有控制器节点
if controllers_config:
update_rate = controllers_config["controller_manager"]["ros__parameters"]["update_rate"]
for controller_id, controller_config in controllers_config["controller_manager"]["ros__parameters"][
"controllers"
].items():
controller_config["update_rate"] = update_rate
self.initialize_controller(controller_id, controller_config)
# 创建定时器,定期发现设备
self._discovery_timer = self.create_timer(
discovery_interval, self._discovery_devices_callback, callback_group=self.callback_group
)
# 添加ping-pong相关属性
self._ping_responses = {} # 存储ping响应
self._ping_lock = threading.Lock()
self.lab_logger().info("[Host Node] Host node initialized.")
HostNode._ready_event.set()
# 发送host_node ready信号到所有桥接器
for bridge in self.bridges:
if hasattr(bridge, "publish_host_ready"):
bridge.publish_host_ready()
self.lab_logger().debug(f"Host ready signal sent via {bridge.__class__.__name__}")
def _send_re_register(self, sclient):
sclient.wait_for_service()
request = SerialCommand.Request()
request.command = ""
future = sclient.call_async(request)
response = future.result()
def _discover_devices(self) -> None:
"""
发现网络中的设备
检测ROS2网络中的所有设备节点并为它们创建ActionClient
同时检测设备离线情况
"""
self.lab_logger().trace("[Host Node] Discovering devices in the network...")
# 获取当前所有设备
nodes_and_names = self.get_node_names_and_namespaces()
# 跟踪本次发现的设备,用于检测离线设备
current_devices = set()
for device_id, namespace in nodes_and_names:
if not namespace.startswith("/devices/"):
continue
edge_device_id = namespace[9:]
# 将设备添加到当前设备集合
device_key = f"{namespace}/{edge_device_id}" # namespace已经包含device_id了这里复写一遍
current_devices.add(device_key)
# 如果是新设备记录并创建ActionClient
if edge_device_id not in self.devices_names:
self.lab_logger().info(f"[Host Node] Discovered new device: {edge_device_id}")
self.devices_names[edge_device_id] = namespace
self._create_action_clients_for_device(device_id, namespace)
self._online_devices.add(device_key)
sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device")
threading.Thread(
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_re_register_device_{namespace}",
).start()
elif device_key not in self._online_devices:
# 设备重新上线
self.lab_logger().info(f"[Host Node] Device reconnected: {device_key}")
self._online_devices.add(device_key)
sclient = self.create_client(SerialCommand, f"/srv{namespace}/re_register_device")
threading.Thread(
target=self._send_re_register,
args=(sclient,),
daemon=True,
name=f"ROSDevice{self.device_id}_re_register_device_{namespace}",
).start()
# 检测离线设备
offline_devices = self._online_devices - current_devices
for device_key in offline_devices:
self.lab_logger().warning(f"[Host Node] Device offline: {device_key}")
self._online_devices.discard(device_key)
# 更新在线设备列表
self._online_devices = current_devices
self.lab_logger().trace(f"[Host Node] Total online devices: {len(self._online_devices)}")
def _discovery_devices_callback(self) -> None:
"""
设备发现定时器回调函数
"""
# 使用互斥锁确保同时只有一个发现过程
if self._discovery_lock.acquire(blocking=False):
try:
self._discover_devices()
# 发现新设备后,更新设备状态订阅
self.update_device_status_subscriptions()
finally:
self._discovery_lock.release()
else:
self.lab_logger().debug("[Host Node] Device discovery already in progress, skipping.")
def _create_action_clients_for_device(self, device_id: str, namespace: str) -> None:
"""
为设备创建所有必要的ActionClient
Args:
device_id: 设备ID
namespace: 设备命名空间
"""
for action_id, action_types in get_action_server_names_and_types_by_node(self, device_id, namespace):
if action_id not in self._action_clients:
try:
action_type = get_ros_type_by_msgname(action_types[0])
self._action_clients[action_id] = ActionClient(
self, action_type, action_id, callback_group=self.callback_group
)
self.lab_logger().trace(f"[Host Node] Created ActionClient (Discovery): {action_id}")
action_name = action_id[len(namespace) + 1 :]
edge_device_id = namespace[9:]
# from unilabos.app.comm_factory import get_communication_client
# comm_client = get_communication_client()
# info_with_schema = ros_action_to_json_schema(action_type)
# comm_client.publish_actions(action_name, {
# "device_id": edge_device_id,
# "device_type": "",
# "action_name": action_name,
# "schema": info_with_schema,
# })
except Exception as e:
self.lab_logger().error(f"[Host Node] Failed to create ActionClient for {action_id}: {str(e)}")
async def create_resource_detailed(
self,
resources: list[Union[list["Resource"], "Resource"]],
device_ids: list[str],
bind_parent_ids: list[str],
bind_locations: list[Point],
other_calling_params: list[str],
) -> List[str]:
responses = []
for resource, device_id, bind_parent_id, bind_location, other_calling_param in zip(
resources, device_ids, bind_parent_ids, bind_locations, other_calling_params
):
# 这里要求device_id传入必须是edge_device_id
if device_id not in self.devices_names:
self.lab_logger().error(
f"[Host Node] Device {device_id} not found in devices_names. Create resource failed."
)
raise ValueError(f"[Host Node] Device {device_id} not found in devices_names. Create resource failed.")
device_key = f"{self.devices_names[device_id]}/{device_id}"
if device_key not in self._online_devices:
self.lab_logger().error(f"[Host Node] Device {device_key} is offline. Create resource failed.")
raise ValueError(f"[Host Node] Device {device_key} is offline. Create resource failed.")
namespace = self.devices_names[device_id]
srv_address = f"/srv{namespace}/append_resource"
sclient = self.create_client(SerialCommand, srv_address)
sclient.wait_for_service()
request = SerialCommand.Request()
request.command = json.dumps(
{
"resource": resource, # 单个/单组 可为 list[list[Resource]]
"namespace": namespace,
"edge_device_id": device_id,
"bind_parent_id": bind_parent_id,
"bind_location": {
"x": bind_location.x,
"y": bind_location.y,
"z": bind_location.z,
},
"other_calling_param": json.loads(other_calling_param) if other_calling_param else {},
},
ensure_ascii=False,
)
response: SerialCommand.Response = await sclient.call_async(request)
responses.append(response.response)
return responses
async def create_resource(
self,
device_id: str,
res_id: str,
class_name: str,
parent: str,
bind_locations: Point,
liquid_input_slot: list[int] = [],
liquid_type: list[str] = [],
liquid_volume: list[int] = [],
slot_on_deck: str = "",
):
# 暂不支持多对同名父子同时存在
res_creation_input = {
"id": res_id.split("/")[-1],
"name": res_id.split("/")[-1],
"class": class_name,
"parent": parent.split("/")[-1],
"position": {
"x": bind_locations.x,
"y": bind_locations.y,
"z": bind_locations.z,
},
}
if len(liquid_input_slot) and liquid_input_slot[0] == -1: # 目前container只逐个创建
res_creation_input.update(
{
"data": {
"liquids": [
{
"liquid_type": liquid_type[0] if liquid_type else None,
"liquid_volume": liquid_volume[0] if liquid_volume else None,
}
]
}
}
)
init_new_res = initialize_resource(res_creation_input) # flatten的格式
if len(init_new_res) > 1: # 一个物料,多个子节点
init_new_res = [init_new_res]
resources: List[Resource] | List[List[Resource]] = init_new_res # initialize_resource已经返回list[dict]
device_ids = [device_id.split("/")[-1]]
bind_parent_id = [res_creation_input["parent"]]
bind_location = [bind_locations]
other_calling_param = [
json.dumps(
{
"ADD_LIQUID_TYPE": liquid_type,
"LIQUID_VOLUME": liquid_volume,
"LIQUID_INPUT_SLOT": liquid_input_slot,
"initialize_full": False,
"slot": slot_on_deck,
}
)
]
response: List[str] = await self.create_resource_detailed(
resources, device_ids, bind_parent_id, bind_location, other_calling_param
)
try:
new_li = []
for i in response:
res = json.loads(i)
new_li.append(res)
return {"resources": new_li, "liquid_input_resources": new_li}
except Exception as ex:
pass
_n = "\n"
raise ValueError(f"创建资源时失败!\n{_n.join(response)}")
def initialize_device(self, device_id: str, device_config: ResourceDictInstance) -> None:
"""
根据配置初始化设备,
此函数根据提供的设备配置动态导入适当的设备类并创建其实例。
同时为设备的动作值映射设置动作客户端。
Args:
device_id: 设备唯一标识符
device_config: 设备配置字典,包含类型和其他参数
"""
self.lab_logger().info(f"[Host Node] Initializing device: {device_id}")
try:
d = initialize_device_from_dict(device_id, device_config)
except DeviceClassInvalid as e:
self.lab_logger().error(f"[Host Node] Device class invalid: {e}")
d = None
if d is None:
return
# noinspection PyProtectedMember
self.devices_names[device_id] = d._ros_node.namespace # 这里不涉及二级device_id
self.device_machine_names[device_id] = "本地"
self.devices_instances[device_id] = d
# noinspection PyProtectedMember
for action_name, action_value_mapping in d._ros_node._action_value_mappings.items():
if action_name.startswith("auto-") or str(action_value_mapping.get("type", "")).startswith(
"UniLabJsonCommand"
):
continue
action_id = f"/devices/{device_id}/{action_name}"
if action_id not in self._action_clients:
action_type = action_value_mapping["type"]
self._action_clients[action_id] = ActionClient(self, action_type, action_id)
self.lab_logger().trace(
f"[Host Node] Created ActionClient (Local): {action_id}"
) # 子设备再创建用的是Discover发现的
# from unilabos.app.comm_factory import get_communication_client
# comm_client = get_communication_client()
# info_with_schema = ros_action_to_json_schema(action_type)
# comm_client.publish_actions(action_name, {
# "device_id": device_id,
# "device_type": device_config["class"],
# "action_name": action_name,
# "schema": info_with_schema,
# })
else:
self.lab_logger().warning(f"[Host Node] ActionClient {action_id} already exists.")
device_key = f"{self.devices_names[device_id]}/{device_id}" # 这里不涉及二级device_id
# 添加到在线设备列表
self._online_devices.add(device_key)
def update_device_status_subscriptions(self) -> None:
"""
更新设备状态订阅
扫描所有设备话题,为新的话题创建订阅,确保不会重复订阅
"""
topic_names_and_types = self.get_topic_names_and_types()
for topic, types in topic_names_and_types:
# 检查是否为设备状态话题且未订阅过
if (
topic.startswith("/devices/")
and not types[0].endswith("FeedbackMessage")
and "_action" not in topic
and topic not in self._subscribed_topics
):
# 解析设备名和属性名
parts = topic.split("/")
if len(parts) >= 4: # 可能有WorkstationNode创建更长的设备
device_id = "/".join(parts[2:-1])
property_name = parts[-1]
# 初始化设备状态字典
if device_id not in self.device_status:
self.device_status[device_id] = {}
self.device_status_timestamps[device_id] = {}
# 默认初始化属性值为 None
self.device_status[device_id] = collections.defaultdict()
self.device_status_timestamps[device_id][property_name] = 0 # 初始化时间戳
# 动态创建订阅
try:
type_class = msg_converter_manager.search_class(types[0].replace("/", "."))
if type_class is None:
self.lab_logger().error(f"[Host Node] Invalid type {types[0]} for {topic}")
else:
self.create_subscription(
type_class,
topic,
lambda msg, d=device_id, p=property_name: self.property_callback(msg, d, p),
1,
callback_group=self.callback_group,
)
# 标记为已订阅
self._subscribed_topics.add(topic)
self.lab_logger().trace(f"[Host Node] Subscribed to new topic: {topic}")
except (NameError, SyntaxError) as e:
self.lab_logger().error(f"[Host Node] Failed to create subscription for topic {topic}: {e}")
"""设备相关"""
def property_callback(self, msg, device_id: str, property_name: str) -> None:
"""
更新设备状态字典中的属性值,并发送到桥接器。
Args:
msg: 接收到的消息
device_id: 设备ID
property_name: 属性名称
"""
# 更新设备状态字典
if hasattr(msg, "data"):
bChange = False
bCreate = False
if isinstance(msg.data, (float, int, str)):
if property_name not in self.device_status[device_id]:
bCreate = True
bChange = True
self.device_status[device_id][property_name] = msg.data
elif self.device_status[device_id][property_name] != msg.data:
bChange = True
self.device_status[device_id][property_name] = msg.data
# 更新时间戳
self.device_status_timestamps[device_id][property_name] = time.time()
else:
self.lab_logger().debug(
f"[Host Node] Unsupported data type for {device_id}/{property_name}: {type(msg.data)}"
)
# 所有 Bridge 对象都应具有 publish_device_status 方法;都会收到设备状态更新
if bChange:
for bridge in self.bridges:
if hasattr(bridge, "publish_device_status"):
bridge.publish_device_status(self.device_status, device_id, property_name)
if bCreate:
self.lab_logger().trace(f"Status created: {device_id}.{property_name} = {msg.data}")
else:
self.lab_logger().debug(f"Status updated: {device_id}.{property_name} = {msg.data}")
def send_goal(
self,
item: "QueueItem",
action_type: str,
action_kwargs: Dict[str, Any],
server_info: Optional[Dict[str, Any]] = None,
) -> None:
"""
向设备发送目标请求
Args:
action_type: 动作类型
action_kwargs: 动作参数
server_info: 服务器发送信息,包含发送时间戳等
"""
u = uuid.UUID(item.job_id)
device_id = item.device_id
action_name = item.action_name
if action_type.startswith("UniLabJsonCommand"):
if action_name.startswith("auto-"):
action_name = action_name[5:]
action_id = f"/devices/{device_id}/_execute_driver_command"
action_kwargs = {
"string": json.dumps(
{
"function_name": action_name,
"function_args": action_kwargs,
}
)
}
if action_type.startswith("UniLabJsonCommandAsync"):
action_id = f"/devices/{device_id}/_execute_driver_command_async"
else:
action_id = f"/devices/{device_id}/{action_name}"
if action_name == "test_latency" and server_info is not None:
self.server_latest_timestamp = server_info.get("send_timestamp", 0.0)
if action_id not in self._action_clients:
raise ValueError(f"ActionClient {action_id} not found.")
action_client: ActionClient = self._action_clients[action_id]
# 遍历action_kwargs下的所有子dict将"sample_uuid"的值赋给"sample_id"
def assign_sample_id(obj):
if isinstance(obj, dict):
if "sample_uuid" in obj:
obj["sample_id"] = obj["sample_uuid"]
obj.pop("sample_uuid")
for k,v in obj.items():
if k != "unilabos_extra":
assign_sample_id(v)
elif isinstance(obj, list):
for item in obj:
assign_sample_id(item)
assign_sample_id(action_kwargs)
goal_msg = convert_to_ros_msg(action_client._action_type.Goal(), action_kwargs)
self.lab_logger().info(f"[Host Node] Sending goal for {action_id}: {goal_msg}")
action_client.wait_for_server()
goal_uuid_obj = UUID(uuid=list(u.bytes))
future = action_client.send_goal_async(
goal_msg,
feedback_callback=lambda feedback_msg: self.feedback_callback(item, action_id, feedback_msg),
goal_uuid=goal_uuid_obj,
)
future.add_done_callback(lambda f: self.goal_response_callback(item, action_id, f))
def goal_response_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""目标响应回调"""
goal_handle = future.result()
if not goal_handle.accepted:
self.lab_logger().warning(f"[Host Node] Goal {item.action_name} ({item.job_id}) rejected")
return
self.lab_logger().info(f"[Host Node] Goal {action_id} ({item.job_id}) accepted")
self._goals[item.job_id] = goal_handle
goal_future = goal_handle.get_result_async()
goal_future.add_done_callback(
lambda f: self.get_result_callback(item, action_id, f)
)
goal_future.result()
def feedback_callback(self, item: "QueueItem", action_id: str, feedback_msg) -> None:
"""反馈回调"""
feedback_data = convert_from_ros_msg(feedback_msg)
feedback_data.pop("goal_id")
self.lab_logger().trace(f"[Host Node] Feedback for {action_id} ({item.job_id}): {feedback_data}")
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(feedback_data, item, "running")
def get_result_callback(self, item: "QueueItem", action_id: str, future) -> None:
"""获取结果回调"""
job_id = item.job_id
try:
result = future.result()
result_msg = result.result
goal_status = result.status
# 检查是否是被取消的任务
if goal_status == GoalStatus.STATUS_CANCELED:
self.lab_logger().info(f"[Host Node] Goal {action_id} ({job_id[:8]}) was cancelled")
status = "failed"
return_info = serialize_result_info("Job was cancelled", False, {})
else:
result_data = convert_from_ros_msg(result_msg)
status = "success"
return_info_str = result_data.get("return_info")
if return_info_str is not None:
try:
return_info = json.loads(return_info_str)
# 适配后端的一些额外处理
return_value = return_info.get("return_value")
if isinstance(return_value, dict):
unilabos_samples = return_info.get("unilabos_samples")
if isinstance(unilabos_samples, list):
return_info["unilabos_samples"] = unilabos_samples
suc = return_info.get("suc", False)
if not suc:
status = "failed"
except json.JSONDecodeError:
status = "failed"
return_info = serialize_result_info("", False, result_data)
self.lab_logger().critical("错误的return_info类型请断点修复")
else:
# 无 return_info 字段时,回退到 success 字段(若存在)
suc_field = result_data.get("success")
if isinstance(suc_field, bool):
status = "success" if suc_field else "failed"
return_info = serialize_result_info("", suc_field, result_data)
else:
# 最保守的回退标记失败并返回空JSON
status = "failed"
return_info = serialize_result_info("缺少return_info", False, result_data)
self.lab_logger().info(f"[Host Node] Result for {action_id} ({job_id[:8]}): {status}")
if goal_status != GoalStatus.STATUS_CANCELED:
self.lab_logger().debug(f"[Host Node] Result data: {result_data}")
# 清理 _goals 中的记录
if job_id in self._goals:
del self._goals[job_id]
self.lab_logger().debug(f"[Host Node] Removed goal {job_id[:8]} from _goals")
# 存储结果供 HTTP API 查询
try:
from unilabos.app.web.controller import store_job_result
if goal_status == GoalStatus.STATUS_CANCELED:
store_job_result(job_id, status, return_info, {})
else:
store_job_result(job_id, status, return_info, result_data)
except ImportError:
pass # controller 模块可能未加载
# 发布状态到桥接器
if job_id:
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
if goal_status == GoalStatus.STATUS_CANCELED:
bridge.publish_job_status({}, item, status, return_info)
else:
bridge.publish_job_status(result_data, item, status, return_info)
except Exception as e:
self.lab_logger().error(
f"[Host Node] Error in get_result_callback for {action_id} ({job_id[:8]}): {str(e)}"
)
import traceback
self.lab_logger().error(traceback.format_exc())
# 清理 _goals 中的记录
if job_id in self._goals:
del self._goals[job_id]
# 发布失败状态
for bridge in self.bridges:
if hasattr(bridge, "publish_job_status"):
bridge.publish_job_status(
{}, item, "failed", serialize_result_info(f"Callback error: {str(e)}", False, {})
)
def cancel_goal(self, goal_uuid: str) -> bool:
"""
取消目标
Args:
goal_uuid: 目标UUIDjob_id
Returns:
bool: 如果找到目标并发起取消请求返回True否则返回False
"""
if goal_uuid in self._goals:
self.lab_logger().info(f"[Host Node] Cancelling goal {goal_uuid[:8]}")
goal_handle = self._goals[goal_uuid]
# 发起异步取消请求
cancel_future = goal_handle.cancel_goal_async()
# 添加取消完成的回调
cancel_future.add_done_callback(lambda future: self._cancel_goal_callback(goal_uuid, future))
return True
else:
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} not found in _goals, cannot cancel")
return False
def _cancel_goal_callback(self, goal_uuid: str, future) -> None:
"""取消目标的回调"""
try:
cancel_response = future.result()
if cancel_response.goals_canceling:
self.lab_logger().info(f"[Host Node] Goal {goal_uuid[:8]} cancel request accepted")
else:
self.lab_logger().warning(f"[Host Node] Goal {goal_uuid[:8]} cancel request rejected")
except Exception as e:
self.lab_logger().error(f"[Host Node] Error cancelling goal {goal_uuid[:8]}: {str(e)}")
import traceback
self.lab_logger().error(traceback.format_exc())
def get_goal_status(self, job_id: str) -> int:
"""获取目标状态"""
if job_id in self._goals:
g = self._goals[job_id]
status = g.status
self.lab_logger().debug(f"[Host Node] Goal status for {job_id}: {status}")
return status
self.lab_logger().warning(f"[Host Node] Goal {job_id} not found, status unknown")
return GoalStatus.STATUS_UNKNOWN
"""Controller Node"""
def initialize_controller(self, controller_id: str, controller_config: Dict[str, Any]) -> None:
"""
初始化控制器
Args:
controller_id: 控制器ID
controller_config: 控制器配置
"""
self.lab_logger().info(f"[Host Node] Initializing controller: {controller_id}")
class_name = controller_config.pop("type")
controller_func = globals()[class_name]
for input_name, input_info in controller_config["inputs"].items():
controller_config["inputs"][input_name]["type"] = get_msg_type(eval(input_info["type"]))
for output_name, output_info in controller_config["outputs"].items():
controller_config["outputs"][output_name]["type"] = get_msg_type(eval(output_info["type"]))
if controller_config["parameters"] is None:
controller_config["parameters"] = {}
controller = ControllerNode(controller_id, controller_func=controller_func, **controller_config)
self.lab_logger().info(f"[Host Node] Controller {controller_id} created.")
# rclpy.get_global_executor().add_node(controller)
"""Resource"""
def _init_host_service(self):
self._resource_services: Dict[str, Service] = {
"resource_add": self.create_service(
ResourceAdd, "/resources/add", self._resource_add_callback, callback_group=self.callback_group
),
"resource_get": self.create_service(
SerialCommand, "/resources/get", self._resource_get_callback, callback_group=self.callback_group
),
"resource_delete": self.create_service(
ResourceDelete,
"/resources/delete",
self._resource_delete_callback,
callback_group=self.callback_group,
),
"resource_update": self.create_service(
ResourceUpdate,
"/resources/update",
self._resource_update_callback,
callback_group=self.callback_group,
),
"resource_list": self.create_service(
ResourceList, "/resources/list", self._resource_list_callback, callback_group=self.callback_group
),
"node_info_update": self.create_service(
SerialCommand,
"/node_info_update",
self._node_info_update_callback,
callback_group=self.callback_group,
),
"c2s_update_resource_tree": self.create_service(
SerialCommand,
"/c2s_update_resource_tree",
self._resource_tree_update_callback,
callback_group=self.callback_group,
),
}
async def _resource_tree_action_add_callback(self, data: dict, response: SerialCommand_Response): # OK
resource_tree_set = ResourceTreeSet.load(data["data"])
mount_uuid = data["mount_uuid"]
first_add = data["first_add"]
self.lab_logger().info(
f"[Host Node-Resource] Loaded ResourceTreeSet with {len(resource_tree_set.trees)} trees, "
f"{len(resource_tree_set.all_nodes)} total nodes"
)
# 处理资源添加逻辑
success = False
uuid_mapping = {}
if len(self.bridges) > 0:
from unilabos.app.web.client import HTTPClient, http_client
resource_start_time = time.time()
uuid_mapping = http_client.resource_tree_add(resource_tree_set, mount_uuid, first_add)
success = True
resource_end_time = time.time()
self.lab_logger().info(
f"[Host Node-Resource] 物料创建上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
)
if uuid_mapping:
self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点")
if success:
from unilabos.resources.graphio import physical_setup_graph
# 将资源添加到本地图中
for node in resource_tree_set.all_nodes:
resource_dict = node.res_content.model_dump(by_alias=True)
if resource_dict.get("id") not in physical_setup_graph.nodes:
physical_setup_graph.add_node(resource_dict["id"], **resource_dict)
else:
physical_setup_graph.nodes[resource_dict["id"]]["data"].update(resource_dict.get("data", {}))
response.response = json.dumps(uuid_mapping) if success else "FAILED"
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
async def _resource_tree_action_get_callback(self, data: dict, response: SerialCommand_Response): # OK
uuid_list: List[str] = data["data"]
with_children: bool = data["with_children"]
from unilabos.app.web.client import http_client
resource_response = http_client.resource_tree_get(uuid_list, with_children)
response.response = json.dumps(resource_response)
async def _resource_tree_action_remove_callback(self, data: dict, response: SerialCommand_Response):
"""
子节点通知Host物料树删除
"""
self.lab_logger().info(f"[Host Node-Resource] Resource tree remove request received")
response.response = "OK"
self.lab_logger().info(f"[Host Node-Resource] Resource tree remove completed")
async def _resource_tree_action_update_callback(self, data: dict, response: SerialCommand_Response):
"""
子节点通知Host物料树更新
"""
resource_tree_set = ResourceTreeSet.load(data["data"])
self.lab_logger().info(
f"[Host Node-Resource] Loaded ResourceTreeSet with {len(resource_tree_set.trees)} trees, "
f"{len(resource_tree_set.all_nodes)} total nodes"
)
from unilabos.app.web.client import http_client
uuid_to_trees: Dict[str, List[ResourceTreeInstance]] = collections.defaultdict(list)
for tree in resource_tree_set.trees:
uuid_to_trees[tree.root_node.res_content.parent_uuid].append(tree)
for uid, trees in uuid_to_trees.items():
new_tree_set = ResourceTreeSet(trees)
resource_start_time = time.time()
self.lab_logger().info(
f"[Host Node-Resource] 物料 {[root_node.res_content.id for root_node in new_tree_set.root_nodes]} {uid} 挂载 {trees[0].root_node.res_content.parent_uuid} 请求更新上传"
)
uuid_mapping = http_client.resource_tree_add(new_tree_set, uid, False)
success = bool(uuid_mapping)
resource_end_time = time.time()
self.lab_logger().info(
f"[Host Node-Resource] 物料更新上传 {round(resource_end_time - resource_start_time, 5) * 1000} ms"
)
if uuid_mapping:
self.lab_logger().info(f"[Host Node-Resource] UUID映射: {len(uuid_mapping)} 个节点")
# 还需要加入到资源图中,暂不实现,考虑资源图新的获取方式
response.response = json.dumps(uuid_mapping)
self.lab_logger().info(f"[Host Node-Resource] Resource tree add completed, success: {success}")
async def _resource_tree_update_callback(self, request: SerialCommand_Request, response: SerialCommand_Response):
"""
子节点通知Host物料树更新
接收序列化的 ResourceTreeSet 数据并进行处理
"""
self.lab_logger().info(f"[Host Node-Resource] Resource tree add request received")
try:
# 解析请求数据
data = json.loads(request.command)
action = data["action"]
data = data["data"]
if action == "add":
await self._resource_tree_action_add_callback(data, response)
elif action == "get":
await self._resource_tree_action_get_callback(data, response)
elif action == "update":
await self._resource_tree_action_update_callback(data, response)
elif action == "remove":
await self._resource_tree_action_remove_callback(data, response)
else:
self.lab_logger().error(f"[Host Node-Resource] Invalid action: {action}")
response.response = "ERROR"
except Exception as e:
self.lab_logger().error(f"[Host Node-Resource] Error adding resource tree: {e}")
self.lab_logger().error(traceback.format_exc())
response.response = f"ERROR: {str(e)}"
return response
def _node_info_update_callback(self, request, response):
"""
更新节点信息回调
"""
# self.lab_logger().info(f"[Host Node] Node info update request received: {request}")
try:
from unilabos.app.communication import get_communication_client
from unilabos.app.web.client import HTTPClient, http_client
info = json.loads(request.command)
if "SYNC_SLAVE_NODE_INFO" in info:
info = info["SYNC_SLAVE_NODE_INFO"]
machine_name = info["machine_name"]
edge_device_id = info["edge_device_id"]
self.device_machine_names[edge_device_id] = machine_name
else:
devices_config = info.pop("devices_config")
registry_config = info.pop("registry_config")
if registry_config:
http_client.resource_registry({"resources": registry_config})
self.lab_logger().debug(f"[Host Node] Node info update: {info}")
response.response = "OK"
except Exception as e:
self.lab_logger().error(f"[Host Node] Error updating node info: {e.args}")
response.response = "ERROR"
return response
def _resource_add_callback(self, request, response):
"""
添加资源回调
处理添加资源请求,将资源数据传递到桥接器
Args:
request: 包含资源数据的请求对象
response: 响应对象
Returns:
响应对象,包含操作结果
"""
resources = [convert_from_ros_msg(resource) for resource in request.resources]
self.lab_logger().info(f"[Host Node-Resource] Add request received: {len(resources)} resources")
success = False
if len(self.bridges) > 0: # 边的提交待定
from unilabos.app.web.client import HTTPClient, http_client
r = http_client.resource_add(add_schema(resources))
success = bool(r)
response.success = success
if success:
from unilabos.resources.graphio import physical_setup_graph
for resource in resources:
if resource.get("id") not in physical_setup_graph.nodes:
physical_setup_graph.add_node(resource["id"], **resource)
else:
physical_setup_graph.nodes[resource["id"]]["data"].update(resource["data"])
self.lab_logger().info(f"[Host Node-Resource] Add request completed, success: {success}")
return response
def _resource_get_process(self, data: Dict[str, Any]):
r = data["data"]
self.lab_logger().debug(f"[Host Node-Resource] Retrieved from bridge: {len(r)} resources")
resources = [convert_to_ros_msg(Resource, resource) for resource in r]
return resources
def _resource_get_callback(self, request: SerialCommand.Request, response: SerialCommand.Response):
"""
获取资源回调
处理获取资源请求,从桥接器或本地查询资源数据
Args:
request: 包含资源ID的请求对象
response: 响应对象
Returns:
响应对象,包含查询到的资源
"""
try:
from unilabos.app.web import http_client
data = json.loads(request.command)
if "uuid" in data and data["uuid"] is not None:
http_req = http_client.resource_tree_get([data["uuid"]], data["with_children"])
elif "id" in data and data["id"].startswith("/"):
http_req = http_client.resource_get(data["id"], data["with_children"])
else:
raise ValueError("没有使用正确的物料 id 或 uuid")
response.response = json.dumps(http_req["data"])
return response
except Exception as e:
self.lab_logger().error(f"[Host Node-Resource] Error retrieving from bridge: {str(e)}")
return response
def _resource_delete_callback(self, request, response):
"""
删除资源回调
处理删除资源请求,将删除指令传递到桥接器
Args:
request: 包含资源ID的请求对象
response: 响应对象
Returns:
响应对象,包含操作结果
"""
self.lab_logger().info(f"[Host Node-Resource] Delete request for ID: {request.id}")
success = False
if len(self.bridges) > 0:
try:
r = self.bridges[-1].resource_delete(request.id)
success = bool(r)
except Exception as e:
self.lab_logger().error(f"[Host Node-Resource] Error deleting resource: {str(e)}")
response.success = success
self.lab_logger().info(f"[Host Node-Resource] Delete request completed, success: {success}")
return response
def _resource_update_callback(self, request, response):
"""
更新资源回调
处理更新资源请求,将更新指令传递到桥接器
Args:
request: 包含资源数据的请求对象
response: 响应对象
Returns:
响应对象,包含操作结果
"""
resources = [convert_from_ros_msg(resource) for resource in request.resources]
self.lab_logger().info(f"[Host Node-Resource] Update request received: {len(resources)} resources")
success = False
if len(self.bridges) > 0:
try:
r = self.bridges[-1].resource_update(add_schema(resources))
success = bool(r)
except Exception as e:
self.lab_logger().error(f"[Host Node-Resource] Error updating resources: {str(e)}")
response.success = success
self.lab_logger().info(f"[Host Node-Resource] Update request completed, success: {success}")
return response
def _resource_list_callback(self, request, response):
"""
列出资源回调
处理列出资源请求,返回所有可用资源
Args:
request: 请求对象
response: 响应对象
Returns:
响应对象,包含资源列表
"""
self.lab_logger().info(f"[Host Node-Resource] List request received")
# 这里可以实现返回资源列表的逻辑
self.lab_logger().debug(f"[Host Node-Resource] List parameters: {request}")
return response
def test_latency(self):
"""
测试网络延迟的action实现
通过5次ping-pong机制校对时间误差并计算实际延迟
"""
import uuid as uuid_module
self.lab_logger().info("=" * 60)
self.lab_logger().info("开始网络延迟测试...")
# 记录任务开始执行的时间
task_start_time = time.time()
# 进行5次ping-pong测试
ping_results = []
for i in range(5):
self.lab_logger().info(f"{i+1}/5次ping-pong测试...")
# 生成唯一的ping ID
ping_id = str(uuid_module.uuid4())
# 记录发送时间
send_timestamp = time.time()
# 发送ping
from unilabos.app.communication import get_communication_client
comm_client = get_communication_client()
comm_client.send_ping(ping_id, send_timestamp)
# 等待pong响应
timeout = 10.0
start_wait_time = time.time()
while time.time() - start_wait_time < timeout:
with self._ping_lock:
if ping_id in self._ping_responses:
pong_data = self._ping_responses.pop(ping_id)
break
time.sleep(0.001)
else:
self.lab_logger().error(f"❌ 第{i+1}次测试超时")
continue
# 计算本次测试结果
receive_timestamp = time.time()
client_timestamp = pong_data["client_timestamp"]
server_timestamp = pong_data["server_timestamp"]
# 往返时间
rtt_ms = (receive_timestamp - send_timestamp) * 1000
# 客户端与服务端时间差(客户端时间 - 服务端时间)
# 假设网络延迟对称,取中间点的服务端时间
mid_point_time = send_timestamp + (receive_timestamp - send_timestamp) / 2
time_diff_ms = (mid_point_time - server_timestamp) * 1000
ping_results.append({"rtt_ms": rtt_ms, "time_diff_ms": time_diff_ms})
self.lab_logger().info(f"✅ 第{i+1}次: 往返时间={rtt_ms:.2f}ms, 时间差={time_diff_ms:.2f}ms")
time.sleep(0.1)
if not ping_results:
self.lab_logger().error("❌ 所有ping-pong测试都失败了")
return {"status": "all_timeout"}
# 统计分析
rtts = [r["rtt_ms"] for r in ping_results]
time_diffs = [r["time_diff_ms"] for r in ping_results]
avg_rtt_ms = sum(rtts) / len(rtts)
avg_time_diff_ms = sum(time_diffs) / len(time_diffs)
max_time_diff_error_ms = max(abs(min(time_diffs)), abs(max(time_diffs)))
self.lab_logger().info("-" * 50)
self.lab_logger().info("[测试统计]")
self.lab_logger().info(f"有效测试次数: {len(ping_results)}/5")
self.lab_logger().info(f"平均往返时间: {avg_rtt_ms:.2f}ms")
self.lab_logger().info(f"平均时间差: {avg_time_diff_ms:.2f}ms")
self.lab_logger().info(f"时间差范围: {min(time_diffs):.2f}ms ~ {max(time_diffs):.2f}ms")
self.lab_logger().info(f"最大时间误差: ±{max_time_diff_error_ms:.2f}ms")
# 计算任务执行延迟
if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0:
self.lab_logger().info("-" * 50)
self.lab_logger().info("[任务执行延迟分析]")
self.lab_logger().info(f"服务端任务下发时间: {self.server_latest_timestamp:.6f}")
self.lab_logger().info(f"客户端任务开始时间: {task_start_time:.6f}")
# 原始时间差(不考虑时间同步误差)
raw_delay_ms = (task_start_time - self.server_latest_timestamp) * 1000
# 考虑时间同步误差后的延迟(用平均时间差校正)
corrected_delay_ms = raw_delay_ms - avg_time_diff_ms
self.lab_logger().info(f"📊 原始时间差: {raw_delay_ms:.2f}ms")
self.lab_logger().info(f"🔧 时间同步校正: {avg_time_diff_ms:.2f}ms")
self.lab_logger().info(f"⏰ 实际任务延迟: {corrected_delay_ms:.2f}ms")
self.lab_logger().info(f"📏 误差范围: ±{max_time_diff_error_ms:.2f}ms")
# 给出延迟范围
min_delay = corrected_delay_ms - max_time_diff_error_ms
max_delay = corrected_delay_ms + max_time_diff_error_ms
self.lab_logger().info(f"📋 延迟范围: {min_delay:.2f}ms ~ {max_delay:.2f}ms")
else:
self.lab_logger().warning("⚠️ 无法获取服务端任务下发时间,跳过任务延迟分析")
raw_delay_ms = -1
corrected_delay_ms = -1
self.lab_logger().info("=" * 60)
return {
"avg_rtt_ms": avg_rtt_ms,
"avg_time_diff_ms": avg_time_diff_ms,
"max_time_error_ms": max_time_diff_error_ms,
"task_delay_ms": corrected_delay_ms if corrected_delay_ms > 0 else -1,
"raw_delay_ms": (
raw_delay_ms if hasattr(self, "server_latest_timestamp") and self.server_latest_timestamp > 0 else -1
),
"test_count": len(ping_results),
"status": "success",
}
def test_resource(
self, resource: ResourceSlot, resources: List[ResourceSlot], device: DeviceSlot, devices: List[DeviceSlot]
) -> TestResourceReturn:
return {
"resources": ResourceTreeSet.from_plr_resources([resource, *resources]).dump(),
"devices": [device, *devices],
}
def handle_pong_response(self, pong_data: dict):
"""
处理pong响应
"""
ping_id = pong_data.get("ping_id")
if ping_id:
with self._ping_lock:
self._ping_responses[ping_id] = pong_data
# 详细信息合并为一条日志
client_timestamp = pong_data.get("client_timestamp", 0)
server_timestamp = pong_data.get("server_timestamp", 0)
current_time = time.time()
self.lab_logger().debug(
f"📨 Pong | ID:{ping_id[:8]}.. | C→S→C: {client_timestamp:.3f}{server_timestamp:.3f}{current_time:.3f}"
)
else:
self.lab_logger().warning("⚠️ 收到无效的Pong响应缺少ping_id")
def notify_resource_tree_update(self, device_id: str, action: str, resource_uuid_list: List[str]) -> bool:
"""
通知设备节点更新资源树
Args:
device_id: 目标设备ID
action: 操作类型 "add", "update", "remove"
resource_uuid_list: 资源UUIDs
Returns:
bool: 操作是否成功
"""
try:
# 检查设备是否存在
if device_id not in self.devices_names:
self.lab_logger().error(f"[Host Node-Resource] Device {device_id} not found in devices_names")
return False
namespace = self.devices_names[device_id]
device_key = f"{namespace}/{device_id}"
# 检查设备是否在线
if device_key not in self._online_devices:
self.lab_logger().error(f"[Host Node-Resource] Device {device_key} is offline")
return False
# 构建服务地址
srv_address = f"/srv{namespace}/s2c_resource_tree"
self.lab_logger().info(f"[Host Node-Resource] Notifying {device_id} for resource tree {action} operation")
# 创建服务客户端
sclient = self.create_client(SerialCommand, srv_address)
# 等待服务可用(设置超时)
if not sclient.wait_for_service(timeout_sec=5.0):
self.lab_logger().error(f"[Host Node-Resource] Service {srv_address} not available")
return False
# 构建请求数据
request_data = [
{
"action": action,
"data": resource_uuid_list,
}
]
# 创建请求
request = SerialCommand.Request()
request.command = json.dumps(request_data, ensure_ascii=False)
# 发送异步请求
future = sclient.call_async(request)
# 等待响应
timeout = 30.0
start_time = time.time()
while not future.done():
if time.time() - start_time > timeout:
self.lab_logger().error(f"[Host Node-Resource] Timeout waiting for response from {device_id}")
return False
time.sleep(0.05)
response = future.result()
self.lab_logger().info(
f"[Host Node-Resource] Resource tree {action} notification completed for {device_id}"
)
return True
except Exception as e:
self.lab_logger().error(f"[Host Node-Resource] Error notifying resource tree update: {str(e)}")
self.lab_logger().error(traceback.format_exc())
return False