Files
Uni-Lab-OS/unilabos/device_comms/opcua_client/client.py
ZiWei 5dc81ec9be bump version to 0.10.3
update registry

do not modify axis globally

Prcix9320 (#207)

* 0.10.7 Update (#101)

* Cleanup registry to be easy-understanding (#76)

* delete deprecated mock devices

* rename categories

* combine chromatographic devices

* rename rviz simulation nodes

* organic virtual devices

* parse vessel_id

* run registry completion before merge

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>

* fix: workstation handlers and vessel_id parsing

* fix: working dir error when input config path
feat: report publish topic when error

* modify default discovery_interval to 15s

* feat: add trace log level

* feat: 添加ChinWe设备控制类,支持串口通信和电机控制功能 (#79)

* fix: drop_tips not using auto resource select

* fix: discard_tips error

* fix: discard_tips

* fix: prcxi_res

* add: prcxi res
fix: startup slow

* feat: workstation example

* fix pumps and liquid_handler handle

* feat: 优化protocol node节点运行日志

* fix all protocol_compilers and remove deprecated devices

* feat: 新增use_remote_resource参数

* fix and remove redundant info

* bugfixes on organic protocols

* fix filter protocol

* fix protocol node

* 临时兼容错误的driver写法

* fix: prcxi import error

* use call_async in all service to avoid deadlock

* fix: figure_resource

* Update recipe.yaml

* add workstation template and battery example

* feat: add sk & ak

* update workstation base

* Create workstation_architecture.md

* refactor: workstation_base 重构为仅含业务逻辑,通信和子设备管理交给 ProtocolNode

* refactor: ProtocolNode→WorkstationNode

* Add:msgs.action (#83)

* update: Workstation dev 将版本号从 0.10.3 更新为 0.10.4 (#84)

* Add:msgs.action

* update: 将版本号从 0.10.3 更新为 0.10.4

* simplify resource system

* uncompleted refactor

* example for use WorkstationBase

* feat: websocket

* feat: websocket test

* feat: workstation example

* feat: action status

* fix: station自己的方法注册错误

* fix: 还原protocol node处理方法

* fix: build

* fix: missing job_id key

* ws test version 1

* ws test version 2

* ws protocol

* 增加物料关系上传日志

* 增加物料关系上传日志

* 修正物料关系上传

* 修复工站的tracker实例追踪失效问题

* 增加handle检测,增加material edge关系上传

* 修复event loop错误

* 修复edge上报错误

* 修复async错误

* 更新schema的title字段

* 主机节点信息等支持自动刷新

* 注册表编辑器

* 修复status密集发送时,消息出错

* 增加addr参数

* fix: addr param

* fix: addr param

* 取消labid 和 强制config输入

* Add action definitions for LiquidHandlerSetGroup and LiquidHandlerTransferGroup

- Created LiquidHandlerSetGroup.action with fields for group name, wells, and volumes.
- Created LiquidHandlerTransferGroup.action with fields for source and target group names and unit volume.
- Both actions include response fields for return information and success status.

* Add LiquidHandlerSetGroup and LiquidHandlerTransferGroup actions to CMakeLists

* Add set_group and transfer_group methods to PRCXI9300Handler and update liquid_handler.yaml

* result_info改为字典类型

* 新增uat的地址替换

* runze multiple pump support

(cherry picked from commit 49354fcf39)

* remove runze multiple software obtainer

(cherry picked from commit 8bcc92a394)

* support multiple backbone

(cherry picked from commit 4771ff2347)

* Update runze pump format

* Correct runze multiple backbone

* Update runze_multiple_backbone

* Correct runze pump multiple receive method.

* Correct runze pump multiple receive method.

* 对于PRCXI9320的transfer_group,一对多和多对多

* 移除MQTT,更新launch文档,提供注册表示例文件,更新到0.10.5

* fix import error

* fix dupe upload registry

* refactor ws client

* add server timeout

* Fix: run-column with correct vessel id (#86)

* fix run_column

* Update run_column_protocol.py

(cherry picked from commit e5aa4d940a)

* resource_update use resource_add

* 新增版位推荐功能

* 重新规定了版位推荐的入参

* update registry with nested obj

* fix protocol node log_message, added create_resource return value

* fix protocol node log_message, added create_resource return value

* try fix add protocol

* fix resource_add

* 修复移液站错误的aspirate注册表

* Feature/xprbalance-zhida (#80)

* feat(devices): add Zhida GC/MS pretreatment automation workstation

* feat(devices): add mettler_toledo xpr balance

* balance

* 重新补全zhida注册表

* PRCXI9320 json

* PRCXI9320 json

* PRCXI9320 json

* fix resource download

* remove class for resource

* bump version to 0.10.6

* 更新所有注册表

* 修复protocolnode的兼容性

* 修复protocolnode的兼容性

* Update install md

* Add Defaultlayout

* 更新物料接口

* fix dict to tree/nested-dict converter

* coin_cell_station draft

* refactor: rename "station_resource" to "deck"

* add standardized BIOYOND resources: bottle_carrier, bottle

* refactor and add BIOYOND resources tests

* add BIOYOND deck assignment and pass all tests

* fix: update resource with correct structure; remove deprecated liquid_handler set_group action

* feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92)

* feat: 新威电池测试系统驱动与注册文件

* feat: bring neware driver & battery.json into workstation_dev_YB2

* add bioyond studio draft

* bioyond station with communication init and resource sync

* fix bioyond station and registry

* fix: update resource with correct structure; remove deprecated liquid_handler set_group action

* frontend_docs

* create/update resources with POST/PUT for big amount/ small amount data

* create/update resources with POST/PUT for big amount/ small amount data

* refactor: add itemized_carrier instead of carrier consists of ResourceHolder

* create warehouse by factory func

* update bioyond launch json

* add child_size for itemized_carrier

* fix bioyond resource io

* Workstation templates: Resources and its CRUD, and workstation tasks (#95)

* coin_cell_station draft

* refactor: rename "station_resource" to "deck"

* add standardized BIOYOND resources: bottle_carrier, bottle

* refactor and add BIOYOND resources tests

* add BIOYOND deck assignment and pass all tests

* fix: update resource with correct structure; remove deprecated liquid_handler set_group action

* feat: 将新威电池测试系统驱动与配置文件并入 workstation_dev_YB2 (#92)

* feat: 新威电池测试系统驱动与注册文件

* feat: bring neware driver & battery.json into workstation_dev_YB2

* add bioyond studio draft

* bioyond station with communication init and resource sync

* fix bioyond station and registry

* create/update resources with POST/PUT for big amount/ small amount data

* refactor: add itemized_carrier instead of carrier consists of ResourceHolder

* create warehouse by factory func

* update bioyond launch json

* add child_size for itemized_carrier

* fix bioyond resource io

---------

Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com>
Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>

* 更新物料接口

* Workstation dev yb2 (#100)

* Refactor and extend reaction station action messages

* Refactor dispensing station tasks to enhance parameter clarity and add batch processing capabilities

- Updated `create_90_10_vial_feeding_task` to include detailed parameters for 90%/10% vial feeding, improving clarity and usability.
- Introduced `create_batch_90_10_vial_feeding_task` for batch processing of 90%/10% vial feeding tasks with JSON formatted input.
- Added `create_batch_diamine_solution_task` for batch preparation of diamine solution, also utilizing JSON formatted input.
- Refined `create_diamine_solution_task` to include additional parameters for better task configuration.
- Enhanced schema descriptions and default values for improved user guidance.

* 修复to_plr_resources

* add update remove

* 支持选择器注册表自动生成
支持转运物料

* 修复资源添加

* 修复transfer_resource_to_another生成

* 更新transfer_resource_to_another参数,支持spot入参

* 新增test_resource动作

* fix host_node error

* fix host_node test_resource error

* fix host_node test_resource error

* 过滤本地动作

* 移动内部action以兼容host node

* 修复同步任务报错不显示的bug

* feat: 允许返回非本节点物料,后面可以通过decoration进行区分,就不进行warning了

* update todo

* modify bioyond/plr converter, bioyond resource registry, and tests

* pass the tests

* update todo

* add conda-pack-build.yml

* add auto install script for conda-pack-build.yml

(cherry picked from commit 172599adcf)

* update conda-pack-build.yml

* update conda-pack-build.yml

* update conda-pack-build.yml

* update conda-pack-build.yml

* update conda-pack-build.yml

* Add version in __init__.py
Update conda-pack-build.yml
Add create_zip_archive.py

* Update conda-pack-build.yml

* Update conda-pack-build.yml (with mamba)

* Update conda-pack-build.yml

* Fix FileNotFoundError

* Try fix 'charmap' codec can't encode characters in position 16-23: character maps to <undefined>

* Fix unilabos msgs search error

* Fix environment_check.py

* Update recipe.yaml

* Update registry. Update uuid loop figure method. Update install docs.

* Fix nested conda pack

* Fix one-key installation path error

* Bump version to 0.10.7

* Workshop bj (#99)

* Add LaiYu Liquid device integration and tests

Introduce LaiYu Liquid device implementation, including backend, controllers, drivers, configuration, and resource files. Add hardware connection, tip pickup, and simplified test scripts, as well as experiment and registry configuration for LaiYu Liquid. Documentation and .gitignore for the device are also included.

* feat(LaiYu_Liquid): 重构设备模块结构并添加硬件文档

refactor: 重新组织LaiYu_Liquid模块目录结构
docs: 添加SOPA移液器和步进电机控制指令文档
fix: 修正设备配置中的最大体积默认值
test: 新增工作台配置测试用例
chore: 删除过时的测试脚本和配置文件

* add

* 重构: 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py 并更新所有导入引用

- 使用 git mv 将 LaiYu_Liquid.py 重命名为 laiyu_liquid_main.py
- 更新所有相关文件中的导入引用
- 保持代码功能不变,仅改善命名一致性
- 测试确认所有导入正常工作

* 修复: 在 core/__init__.py 中添加 LaiYuLiquidBackend 导出

- 添加 LaiYuLiquidBackend 到导入列表
- 添加 LaiYuLiquidBackend 到 __all__ 导出列表
- 确保所有主要类都可以正确导入

* 修复大小写文件夹名字

* 电池装配工站二次开发教程(带目录)上传至dev (#94)

* 电池装配工站二次开发教程

* Update intro.md

* 物料教程

* 更新物料教程,json格式注释

* Update prcxi driver & fix transfer_liquid mix_times (#90)

* Update prcxi driver & fix transfer_liquid mix_times

* fix: correct mix_times type

* Update liquid_handler registry

* test: prcxi.py

* Update registry from pr

* fix ony-key script not exist

* clean files

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>
Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
Co-authored-by: Guangxin Zhang <guangxin.zhang.bio@gmail.com>
Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>
Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com>
Co-authored-by: LccLink <1951855008@qq.com>
Co-authored-by: lixinyu1011 <61094742+lixinyu1011@users.noreply.github.com>
Co-authored-by: shiyubo0410 <shiyubo@dp.tech>

* fix startup env check.
add auto install during one-key installation

* Try fix one-key build on linux

* Complete all one key installation

* fix: rename schema field to resource_schema with serialization and validation aliases (#104)

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>

* Fix one-key installation build

Install conda-pack before pack command

Add conda-pack to base when building one-key installer

Fix param error when using mamba run

Try fix one-key build on linux

* Fix conda pack on windows

* add plr_to_bioyond, and refactor bioyond stations

* modify default config

* Fix one-key installation build for windows

* Fix workstation startup
Update registry

* Fix/resource UUID and doc fix (#109)

* Fix ResourceTreeSet load error

* Raise error when using unsupported type to create ResourceTreeSet

* Fix children key error

* Fix children key error

* Fix workstation resource not tracking

* Fix workstation deck & children resource dupe

* Fix workstation deck & children resource dupe

* Fix multiple resource error

* Fix resource tree update

* Fix resource tree update

* Force confirm uuid

* Tip more error log

* Refactor Bioyond workstation and experiment workflow (#105)

Refactored the Bioyond workstation classes to improve parameter handling and workflow management. Updated experiment.py to use BioyondReactionStation with deck and material mappings, and enhanced workflow step parameter mapping and execution logic. Adjusted JSON experiment configs, improved workflow sequence handling, and added UUID assignment to PLR materials. Removed unused station_config and material cache logic, and added detailed docstrings and debug output for workflow methods.

* Fix resource get.
Fix resource parent not found.
Mapping uuid for all resources.

* mount parent uuid

* Add logging configuration based on BasicConfig in main function

* fix workstation node error

* fix workstation node error

* Update boot example

* temp fix for resource get

* temp fix for resource get

* provide error info when cant find plr type

* pack repo info

* fix to plr type error

* fix to plr type error

* Update regular container method

* support no size init

* fix comprehensive_station.json

* fix comprehensive_station.json

* fix type conversion

* fix state loading for regular container

* Update deploy-docs.yml

* Update deploy-docs.yml

---------

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>

* Close #107
Update doc url.

* Fix/update resource (#112)

* cancel upload_registry

* Refactor Bioyond workstation and experiment workflow -fix (#111)

* refactor(bioyond_studio): 优化材料缓存加载和参数验证逻辑

改进材料缓存加载逻辑以支持多种材料类型和详细材料处理
更新工作流参数验证中的字段名从key/value改为Key/DisplayValue
移除未使用的merge_workflow_with_parameters方法
添加get_station_info方法获取工作站基础信息
清理实验文件中的注释代码和更新导入路径

* fix: 修复资源移除时的父资源检查问题

在BaseROS2DeviceNode中,移除资源前添加对父资源是否为None的检查,避免空指针异常
同时更新Bottle和BottleCarrier类以支持**kwargs参数
修正测试文件中Liquid_feeding_beaker的大小写拼写错误

* correct return message

---------

Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>

* fix resource_get in action

* fix(reaction_station): 清空工作流序列和参数避免重复执行 (#113)

在创建任务后清空工作流序列和参数,防止下次执行时累积重复

* Update create_resource device_id

* Update ResourceTracker

add more enumeration in POSE

fix converter in resource_tracker

* Update graphio together with workstation design.

fix(reaction_station): 为步骤参数添加Value字段传个BY后端

fix(bioyond/warehouses): 修正仓库尺寸和物品排列参数

调整仓库的x轴和z轴物品数量以及物品尺寸参数,使其符合4x1x4的规格要求

fix warehouse serialize/deserialize

fix bioyond converter

fix itemized_carrier.unassign_child_resource

allow not-loaded MSG in registry

add layout serializer & converter

warehouseuse A1-D4; add warehouse layout

fix(graphio): 修正bioyond到plr资源转换中的坐标计算错误

Fix resource assignment and type mapping issues

Corrects resource assignment in ItemizedCarrier by using the correct spot key from _ordering. Updates graphio to use 'typeName' instead of 'name' for type mapping in resource_bioyond_to_plr. Renames DummyWorkstation to BioyondWorkstation in workstation_http_service for clarity.

* Update workstation & bioyond example

Refine descriptions in Bioyond reaction station YAML

Updated and clarified field and operation descriptions in the reaction_station_bioyond.yaml file for improved accuracy and consistency. Changes include more precise terminology, clearer parameter explanations, and standardized formatting for operation schemas.

refactor(workstation): 更新反应站参数描述并添加分液站配置文件

修正反应站方法参数描述,使其更准确清晰
添加bioyond_dispensing_station.yaml配置文件

add create_workflow script and test

add invisible_slots to carriers

fix(warehouses): 修正bioyond_warehouse_1x4x4仓库的尺寸参数

调整仓库的num_items_x和num_items_z值以匹配实际布局,并更新物品尺寸参数

save resource get data. allow empty value for layout and cross_section_type

More decks&plates support for bioyond (#115)

refactor(registry): 重构反应站设备配置,简化并更新操作命令

移除旧的自动操作命令,新增针对具体化学操作的命令配置
更新模块路径和配置结构,优化参数定义和描述

fix(dispensing_station): 修正物料信息查询方法调用

将直接调用material_id_query改为通过hardware_interface调用,以符合接口设计规范

* PRCXI Update

修改prcxi连线

prcxi样例图

Create example_prcxi.json

* Update resource extra & uuid.

use ordering to convert identifier to idx

convert identifier to site idx

correct extra key

update extra before transfer

fix multiple instance error

add resource_tree_transfer func

fox itemrized carrier assign child resource

support internal device material transfer

remove extra key

use same callback group

support material extra

support material extra
support update_resource_site in extra

* Update workstation.

modify workstation_architecture docs

bioyond_HR (#133)

* feat: Enhance Bioyond synchronization and resource management

- Implemented synchronization for all material types (consumables, samples, reagents) from Bioyond, logging detailed information for each type.
- Improved error handling and logging during synchronization processes.
- Added functionality to save Bioyond material IDs in UniLab resources for future updates.
- Enhanced the `sync_to_external` method to handle material movements correctly, including querying and creating materials in Bioyond.
- Updated warehouse configurations to support new storage types and improved layout for better resource management.
- Introduced new resource types such as reactors and tip boxes, with detailed specifications.
- Modified warehouse factory to support column offsets for naming conventions (e.g., A05-D08).
- Improved resource tracking by merging extra attributes instead of overwriting them.
- Added a new method for updating resources in Bioyond, ensuring better synchronization of resource changes.

* feat: 添加TipBox和Reactor的配置到bottles.yaml

* fix: 修复液体投料方法中的volume参数处理逻辑

修复solid_feeding_vials方法中的volume参数处理逻辑,优化solvents参数的使用条件

更新液体投料方法,支持通过溶剂信息自动计算体积,添加solvents参数并更新文档描述

Add batch creation methods for vial and solution tasks

添加批量创建90%10%小瓶投料任务和二胺溶液配置任务的功能,更新相关参数和默认值

* 封膜仪、撕膜仪、耗材站接口

* 添加Raman和xrd相关代码

* Resource update & asyncio fix

correct bioyond config

prcxi example

fix append_resource

fix regularcontainer

fix cancel error

fix resource_get param

fix json dumps

support name change during materials change

enable slave mode

change uuid logger to trace level

correct remove_resource stats

disable slave connect websocket

adjust with_children param

modify devices to use correct executor (sleep, create_task)

support sleep and create_task in node

fix run async execution error

* bump version to 0.10.9

update registry

* PRCXI Reset Error Correction (#166)

* change 9320 desk row number to 4

* Updated 9320 host address

* Updated 9320 host address

* Add **kwargs in classes: PRCXI9300Deck and PRCXI9300Container

* Removed all sample_id in prcxi_9320.json to avoid KeyError

* 9320 machine testing settings

* Typo

* Rewrite setup logic to clear error code

* 初始化 step_mode 属性

* 1114物料手册定义教程byxinyu (#165)

* 宜宾奔耀工站deck前端by_Xinyu

* 构建物料教程byxinyu

* 1114物料手册定义教程

* 3d sim (#97)

* 修改lh的json启动

* 修改lh的json启动

* 修改backend,做成sim的通用backend

* 修改yaml的地址,3D模型适配网页生产环境

* 添加laiyu硬件连接

* 修改移液枪的状态判断方法,

修改移液枪的状态判断方法,
添加三轴的表定点与零点之间的转换
添加三轴真实移动的backend

* 修改laiyu移液站

简化移动方法,
取消软件限制位置,
修改当值使用Z轴时也需要重新复位Z轴的问题

* 更新lh以及laiyu workshop

1,现在可以直接通过修改backend,适配其他的移液站,主类依旧使用LiquidHandler,不用重新编写

2,修改枪头判断标准,使用枪头自身判断而不是类的判断,

3,将归零参数用毫米计算,方便手动调整,

4,修改归零方式,上电使用机械归零,确定机械零点,手动归零设置工作区域零点方便计算,二者互不干涉

* 修改枪头动作

* 修改虚拟仿真方法

---------

Co-authored-by: zhangshixiang <@zhangshixiang>
Co-authored-by: Junhan Chang <changjh@dp.tech>

* 标准化opcua设备接入unilab (#78)

* 初始提交,只保留工作区当前状态

* remove redundant arm_slider meshes

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>

* add new laiyu liquid driver, yaml and json files (#164)

* HR物料同步,前端展示位置修复 (#135)

* 更新Bioyond工作站配置,添加新的物料类型映射和载架定义,优化物料查询逻辑

* 添加Bioyond实验配置文件,定义物料类型映射和设备配置

* 更新bioyond_warehouse_reagent_stack方法,修正试剂堆栈尺寸和布局描述

* 更新Bioyond实验配置,修正物料类型映射,优化设备配置

* 更新Bioyond资源同步逻辑,优化物料入库流程,增强错误处理和日志记录

* 更新Bioyond资源,添加配液站和反应站专用载架,优化仓库工厂函数的排序方式

* 更新Bioyond资源,添加配液站和反应站相关载架,优化试剂瓶和样品瓶配置

* 更新Bioyond实验配置,修正试剂瓶载架ID,确保与设备匹配

* 更新Bioyond资源,移除反应站单烧杯载架,添加反应站单烧瓶载架分类

* Refactor Bioyond resource synchronization and update bottle carrier definitions

- Removed traceback printing in error handling for Bioyond synchronization.
- Enhanced logging for existing Bioyond material ID usage during synchronization.
- Added new bottle carrier definitions for single flask and updated existing ones.
- Refactored dispensing station and reaction station bottle definitions for clarity and consistency.
- Improved resource mapping and error handling in graphio for Bioyond resource conversion.
- Introduced layout parameter in warehouse factory for better warehouse configuration.

* 更新Bioyond仓库工厂,添加排序方式支持,优化坐标计算逻辑

* 更新Bioyond载架和甲板配置,调整样品板尺寸和仓库坐标

* 更新Bioyond资源同步,增强占用位置日志信息,修正坐标转换逻辑

* 更新Bioyond反应站和分配站配置,调整材料类型映射和ID,移除不必要的项

* support name change during materials change

* fix json dumps

* correct tip

* 优化调度器API路径,更新相关方法描述

* 更新 BIOYOND 载架相关文档,调整 API 以支持自带试剂瓶的载架类型,修复资源获取时的子物料处理逻辑

* 实现资源删除时的同步处理,优化出库操作逻辑

* 修复 ItemizedCarrier 中的可见性逻辑

* 保存 Bioyond 原始信息到 unilabos_extra,以便出库时查询

* 根据 resource.capacity 判断是试剂瓶(载架)还是多瓶载架,走不同的奔曜转换

* Fix bioyond bottle_carriers ordering

* 优化 Bioyond 物料同步逻辑,增强坐标解析和位置更新处理

* disable slave connect websocket

* correct remove_resource stats

* change uuid logger to trace level

* enable slave mode

* refactor(bioyond): 统一资源命名并优化物料同步逻辑

- 将DispensingStation和ReactionStation资源统一为PolymerStation命名
- 优化物料同步逻辑,支持耗材类型(typeMode=0)的查询
- 添加物料默认参数配置功能
- 调整仓库坐标布局
- 清理废弃资源定义

* feat(warehouses): 为仓库函数添加col_offset和layout参数

* refactor: 更新实验配置中的物料类型映射命名

将DispensingStation和ReactionStation的物料类型映射统一更名为PolymerStation,保持命名一致性

* fix: 更新实验配置中的载体名称从6VialCarrier到6StockCarrier

* feat(bioyond): 实现物料创建与入库分离逻辑

将物料同步流程拆分为两个独立阶段:transfer阶段只创建物料,add阶段执行入库
简化状态检查接口,仅返回连接状态

* fix(reaction_station): 修正液体进料烧杯体积单位并增强返回结果

将液体进料烧杯的体积单位从μL改为g以匹配实际使用场景
在返回结果中添加merged_workflow和order_params字段,提供更完整的工作流信息

* feat(dispensing_station): 在任务创建返回结果中添加order_params信息

在create_order方法返回结果中增加order_params字段,以便调用方获取完整的任务参数

* fix(dispensing_station): 修改90%物料分配逻辑从分成3份改为直接使用

原逻辑将主称固体平均分成3份作为90%物料,现改为直接使用main_portion

* feat(bioyond): 添加任务编码和任务ID的输出,支持批量任务创建后的状态监控

* refactor(registry): 简化设备配置中的任务结果处理逻辑

将多个单独的任务编码和ID字段合并为统一的return_info字段
更新相关描述以反映新的数据结构

* feat(工作站): 添加HTTP报送服务和任务完成状态跟踪

- 在graphio.py中添加API必需字段
- 实现工作站HTTP服务启动和停止逻辑
- 添加任务完成状态跟踪字典和等待方法
- 重写任务完成报送处理方法记录状态
- 支持批量任务完成等待和报告获取

* refactor(dispensing_station): 移除wait_for_order_completion_and_get_report功能

该功能已被wait_for_multiple_orders_and_get_reports替代,简化代码结构

* fix: 更新任务报告API错误

* fix(workstation_http_service): 修复状态查询中device_id获取逻辑

处理状态查询时安全获取device_id,避免因属性不存在导致的异常

* fix(bioyond_studio): 改进物料入库失败时的错误处理和日志记录

在物料入库API调用失败时,添加更详细的错误信息打印
同时修正station.py中对空响应和失败情况的判断逻辑

* refactor(bioyond): 优化瓶架载体的分配逻辑和注释说明

重构瓶架载体的分配逻辑,使用嵌套循环替代硬编码索引分配
添加更详细的坐标映射说明,明确PLR与Bioyond坐标的对应关系

* fix(bioyond_rpc): 修复物料入库成功时无data字段返回空的问题

当API返回成功但无data字段时,返回包含success标识的字典而非空字典

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>
Co-authored-by: Junhan Chang <changjh@dp.tech>

* nmr

* Update devices

* bump version to 0.10.10

* Update repo files.

* Add get_resource_with_dir & get_resource method

* fix camera & workstation & warehouse & reaction station driver

* update docs, test examples
fix liquid_handler init bug

* bump version to 0.10.11

* Add startup_json_path, disable_browser, port config

* Update oss config

* feat(bioyond_studio): 添加项目API接口支持及优化物料管理功能

添加通用项目API接口方法(_post_project_api, _delete_project_api)用于与LIMS系统交互
实现compute_experiment_design方法用于实验设计计算
新增brief_step_parameters等订单相关接口方法
优化物料转移逻辑,增加异步任务处理
扩展BioyondV1RPC类,添加批量物料操作、订单状态管理等功能

* feat(bioyond): 添加测量小瓶仓库和更新仓库工厂函数参数

* Support unilabos_samples key

* add session_id and normal_exit

* Add result schema and add TypedDict conversion.

* Fix port error

* Add backend api and update doc

* Add get_regular_container func

* Add get_regular_container func

* Transfer_liquid (#176)

* change 9320 desk row number to 4

* Updated 9320 host address

* Updated 9320 host address

* Add **kwargs in classes: PRCXI9300Deck and PRCXI9300Container

* Removed all sample_id in prcxi_9320.json to avoid KeyError

* 9320 machine testing settings

* Typo

* Typo in base_device_node.py

* Enhance liquid handling functionality by adding support for multiple transfer modes (one-to-many, one-to-one, many-to-one) and improving parameter validation. Default channel usage is set when not specified. Adjusted mixing logic to ensure it only occurs when valid conditions are met. Updated documentation for clarity.

* Auto dump logs, fix workstation input schema

* Fix startup with remote resource error

Resource dict fully change to "pose" key

Update oss link

Reduce pylabrobot conversion warning & force enable log dump.

更新 logo 图片

* signal when host node is ready

* fix ros2 future

print all logs to file
fix resource dict dump error

* update version to 0.10.12

* 修改sample_uuid的返回值

* 修改pose标签设定机制

* 添加 aspiate函数返回值

* 返回dispense后的sample_uuid

* 添加self.pending_liquids_dict的重置方法

* 修改prcxi的json文件,解决trach错误问题

* 修改prcxijson,防止PlateT4的硬件错误

* 对laiyu移液站进行部分修改,取消多次初始化的问题

* 修改根据新的物料格式,修改可视化

* 添加切换枪头方法,添加mock振荡与加热方法

* 夹爪添加

* 删除多余的laiyu部分

* 云端可启动夹爪

* Delete __init__.py

* Enhance PRCXI9300 classes with new Container and TipRack implementations, improving state management and initialization logic. Update JSON configuration to reflect type changes for containers and plates.

* 修改上传数据

---------

Co-authored-by: Junhan Chang <changjh@dp.tech>
Co-authored-by: ZiWei <131428629+ZiWei09@users.noreply.github.com>
Co-authored-by: Guangxin Zhang <guangxin.zhang.bio@gmail.com>
Co-authored-by: Xie Qiming <97236197+Andy6M@users.noreply.github.com>
Co-authored-by: h840473807 <47357934+h840473807@users.noreply.github.com>
Co-authored-by: LccLink <1951855008@qq.com>
Co-authored-by: lixinyu1011 <61094742+lixinyu1011@users.noreply.github.com>
Co-authored-by: shiyubo0410 <shiyubo@dp.tech>
Co-authored-by: hh.(SII) <103566763+Mile-Away@users.noreply.github.com>
Co-authored-by: Xianwei Qi <qxw@stu.pku.edu.cn>
Co-authored-by: WenzheG <wenzheguo32@gmail.com>
Co-authored-by: Harry Liu <113173203+ALITTLELZ@users.noreply.github.com>
Co-authored-by: q434343 <73513873+q434343@users.noreply.github.com>
Co-authored-by: tt <166512503+tt11142023@users.noreply.github.com>
Co-authored-by: xyc <49015816+xiaoyu10031@users.noreply.github.com>
Co-authored-by: zhangshixiang <@zhangshixiang>
Co-authored-by: zhangshixiang <554662886@qq.com>
Co-authored-by: ALITTLELZ <l_LZlz@163.com>

Add topic config

add camera driver (#191)

* add camera driver

* add init.py file to cameraSII driver

增强新威电池测试系统 OSS 上传功能 / Enhanced Neware Battery Test System OSS Upload (#196)

* feat: neware-oss-upload-enhancement

* feat(neware): enhance OSS upload with metadata and workflow handles

Add post process station and related resources (#195)

* Add post process station and related resources

- Created JSON configuration for post_process_station and its child post_process_deck.
- Added YAML definitions for post_process_station, bottle carriers, bottles, and deck resources.
- Implemented Python classes for bottle carriers, bottles, decks, and warehouses to manage resources in the post process.
- Established a factory method for creating warehouses with customizable dimensions and layouts.
- Defined the structure and behavior of the post_process_deck and its associated warehouses.

* feat(post_process): add post_process_station and related warehouse functionality

- Introduced post_process_station.json to define the post-processing station structure.
- Implemented post_process_warehouse.py to create warehouse configurations with customizable layouts.
- Added warehouses.py for specific warehouse configurations (4x3x1).
- Updated post_process_station.yaml to reflect new module paths for OpcUaClient.
- Refactored bottle carriers and bottles YAML files to point to the new module paths.
- Adjusted deck.yaml to align with the new organizational structure for post_process_deck.

prcxi resource (#202)

* prcxi resource

* prcxi_resource

* Fix upload error not showing.
Support str type category.

---------

Co-authored-by: Xuwznln <18435084+Xuwznln@users.noreply.github.com>

Fix upload error not showing.
Support str type category.

feat: introduce `wait_time` command and configurable device communication timeout.

feat: Add `SyringePump` (SY-03B) driver with unified serial/TCP transport for `chinwe` device, including registry and test configurations.
2025-12-26 03:36:48 +08:00

1782 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import traceback
from typing import Any, Union, List, Dict, Callable, Optional, Tuple
from pydantic import BaseModel
from opcua import Client, ua
import pandas as pd
import os
from unilabos.device_comms.opcua_client.node.uniopcua import Base as OpcUaNodeBase
from unilabos.device_comms.opcua_client.node.uniopcua import Variable, Method, NodeType, DataType
from unilabos.device_comms.universal_driver import UniversalDriver
from unilabos.utils.log import logger
from unilabos.devices.workstation.post_process.decks import post_process_deck
class OpcUaNode(BaseModel):
name: str
node_type: NodeType
node_id: str = ""
data_type: Optional[DataType] = None
parent_node_id: Optional[str] = None
class OpcUaWorkflow(BaseModel):
name: str
actions: List[
Union[
"OpcUaWorkflow",
Callable[
[Callable[[str], OpcUaNodeBase]],
None
]]
]
class Action(BaseModel):
name: str
rw: bool # read是0 write是1
class WorkflowAction(BaseModel):
init: Optional[Callable[[Callable[[str], OpcUaNodeBase]], bool]] = None
start: Optional[Callable[[Callable[[str], OpcUaNodeBase]], bool]] = None
stop: Optional[Callable[[Callable[[str], OpcUaNodeBase]], bool]] = None
cleanup: Optional[Callable[[Callable[[str], OpcUaNodeBase]], None]] = None
class OpcUaWorkflowModel(BaseModel):
name: str
actions: List[Union["OpcUaWorkflowModel", WorkflowAction]]
parameters: Optional[List[str]] = None
description: Optional[str] = None
""" 前后端Json解析用 """
class NodeFunctionJson(BaseModel):
func_name: str
node_name: str
mode: str # read, write, call
value: Any = None
class InitFunctionJson(NodeFunctionJson):
pass
class StartFunctionJson(NodeFunctionJson):
write_functions: List[str]
condition_functions: List[str]
stop_condition_expression: str
class StopFunctionJson(NodeFunctionJson):
pass
class CleanupFunctionJson(NodeFunctionJson):
pass
class ActionJson(BaseModel):
node_function_to_create: List[NodeFunctionJson]
create_init_function: Optional[InitFunctionJson] = None
create_start_function: Optional[StartFunctionJson] = None
create_stop_function: Optional[StopFunctionJson] = None
create_cleanup_function: Optional[CleanupFunctionJson] = None
class SimplifiedActionJson(BaseModel):
"""简化的动作JSON格式直接定义节点列表和函数"""
nodes: Optional[Dict[str, Dict[str, Any]]] = None # 节点定义,格式为 {func_name: {node_name, mode, value}}
init_function: Optional[Dict[str, Any]] = None
start_function: Optional[Dict[str, Any]] = None
stop_function: Optional[Dict[str, Any]] = None
cleanup_function: Optional[Dict[str, Any]] = None
class WorkflowCreateJson(BaseModel):
name: str
action: List[Union[ActionJson, SimplifiedActionJson, 'WorkflowCreateJson', str]]
parameters: Optional[List[str]] = None
description: Optional[str] = None
class ExecuteProcedureJson(BaseModel):
register_node_list_from_csv_path: Optional[Dict[str, Any]] = None
create_flow: List[WorkflowCreateJson]
execute_flow: List[str]
class BaseClient(UniversalDriver):
client: Optional[Client] = None
_node_registry: Dict[str, OpcUaNodeBase] = {}
DEFAULT_ADDRESS_PATH = ""
_variables_to_find: Dict[str, Dict[str, Any]] = {}
_name_mapping: Dict[str, str] = {} # 英文名到中文名的映射
_reverse_mapping: Dict[str, str] = {} # 中文名到英文名的映射
# 直接缓存已找到的 ua.Node 对象,避免因字符串 NodeId 格式导致订阅失败
_found_node_objects: Dict[str, Any] = {}
def __init__(self):
super().__init__()
# 自动查找节点功能默认开启
self._auto_find_nodes = True
# 初始化名称映射字典
self._name_mapping = {}
self._reverse_mapping = {}
# 初始化线程锁(在子类中会被重新创建,这里提供默认实现)
import threading
self._client_lock = threading.RLock()
def _set_client(self, client: Optional[Client]) -> None:
if client is None:
raise ValueError('client is not valid')
self.client = client
def _connect(self) -> None:
logger.info('try to connect client...')
if self.client:
try:
self.client.connect()
logger.info('client connected!')
# 连接后开始查找节点
if self._variables_to_find:
self._find_nodes()
except Exception as e:
logger.error(f'client connect failed: {e}')
raise
else:
raise ValueError('client is not initialized')
def _find_nodes(self) -> None:
"""查找服务器中的节点"""
if not self.client:
raise ValueError('client is not connected')
logger.info(f'开始查找 {len(self._variables_to_find)} 个节点...')
try:
# 获取根节点
root = self.client.get_root_node()
objects = root.get_child(["0:Objects"])
# 记录查找前的状态
before_count = len(self._node_registry)
# 查找节点
self._find_nodes_recursive(objects)
# 记录查找后的状态
after_count = len(self._node_registry)
newly_found = after_count - before_count
logger.info(f"本次查找新增 {newly_found} 个节点,当前共 {after_count}")
# 检查是否所有节点都已找到
not_found = []
for var_name, var_info in self._variables_to_find.items():
if var_name not in self._node_registry:
not_found.append(var_name)
if not_found:
logger.warning(f"⚠ 以下 {len(not_found)} 个节点未找到: {', '.join(not_found[:10])}{'...' if len(not_found) > 10 else ''}")
logger.warning(f"提示:请检查这些节点名称是否与服务器的 BrowseName 完全匹配(包括大小写、空格等)")
# 提供一个示例来帮助调试
if not_found:
logger.info(f"尝试在服务器中查找第一个未找到的节点 '{not_found[0]}' 的相似节点...")
else:
logger.info(f"✓ 所有 {len(self._variables_to_find)} 个节点均已找到并注册")
except Exception as e:
logger.error(f"查找节点失败: {e}")
traceback.print_exc()
def _find_nodes_recursive(self, node) -> None:
"""递归查找节点"""
try:
# 获取当前节点的浏览名称
browse_name = node.get_browse_name()
node_name = browse_name.Name
# 检查是否是我们要找的变量
if node_name in self._variables_to_find and node_name not in self._node_registry:
var_info = self._variables_to_find[node_name]
node_type = var_info.get("node_type")
data_type = var_info.get("data_type")
node_id_str = str(node.nodeid)
# 根据节点类型创建相应的对象
if node_type == NodeType.VARIABLE:
self._node_registry[node_name] = Variable(self.client, node_name, node_id_str, data_type)
logger.info(f"✓ 找到变量节点: '{node_name}', NodeId: {node_id_str}, DataType: {data_type}")
# 缓存真实的 ua.Node 对象用于订阅
self._found_node_objects[node_name] = node
elif node_type == NodeType.METHOD:
# 对于方法节点需要获取父节点ID
parent_node = node.get_parent()
parent_node_id = str(parent_node.nodeid)
self._node_registry[node_name] = Method(self.client, node_name, node_id_str, parent_node_id, data_type)
logger.info(f"✓ 找到方法节点: '{node_name}', NodeId: {node_id_str}, ParentId: {parent_node_id}")
# 递归处理子节点
for child in node.get_children():
self._find_nodes_recursive(child)
except Exception as e:
# 忽略处理单个节点时的错误,继续处理其他节点
pass
@classmethod
def load_csv(cls, file_path: str) -> List[OpcUaNode]:
"""
从CSV文件加载节点定义
CSV文件需包含Name,NodeType,DataType列
可选包含EnglishName和NodeLanguage列
"""
df = pd.read_csv(file_path)
df = df.drop_duplicates(subset='Name', keep='first') # 重复的数据应该报错
nodes = []
# 检查是否包含英文名称列和节点语言列
has_english_name = 'EnglishName' in df.columns
has_node_language = 'NodeLanguage' in df.columns
# 如果存在英文名称列,创建名称映射字典
name_mapping = {}
reverse_mapping = {}
for _, row in df.iterrows():
name = row.get('Name')
node_type_str = row.get('NodeType')
data_type_str = row.get('DataType')
# 获取英文名称和节点语言(如果有)
english_name = row.get('EnglishName') if has_english_name else None
node_language = row.get('NodeLanguage') if has_node_language else 'English' # 默认为英文
# 如果有英文名称,添加到映射字典
if english_name and not pd.isna(english_name) and node_language == 'Chinese':
name_mapping[english_name] = name
reverse_mapping[name] = english_name
if not name or not node_type_str:
logger.warning(f"跳过无效行: 名称或节点类型缺失")
continue
# 只支持VARIABLE和METHOD两种类型
if node_type_str not in ['VARIABLE', 'METHOD']:
logger.warning(f"不支持的节点类型: {node_type_str}仅支持VARIABLE和METHOD")
continue
try:
node_type = NodeType[node_type_str]
except KeyError:
logger.warning(f"无效的节点类型: {node_type_str}")
continue
# 对于VARIABLE节点必须指定数据类型
if node_type == NodeType.VARIABLE:
if not data_type_str or pd.isna(data_type_str):
logger.warning(f"变量节点 {name} 必须指定数据类型")
continue
try:
data_type = DataType[data_type_str]
except KeyError:
logger.warning(f"无效的数据类型: {data_type_str}")
continue
else:
# 对于METHOD节点数据类型可选
data_type = None
if data_type_str and not pd.isna(data_type_str):
try:
data_type = DataType[data_type_str]
except KeyError:
logger.warning(f"无效的数据类型: {data_type_str},将使用默认值")
# 创建节点对象节点ID留空将通过自动查找功能获取
nodes.append(OpcUaNode(
name=name,
node_type=node_type,
data_type=data_type
))
# 返回节点列表和名称映射字典
return nodes, name_mapping, reverse_mapping
def use_node(self, name: str) -> OpcUaNodeBase:
"""
获取已注册的节点
如果节点尚未找到,会尝试再次查找
支持使用英文名称访问中文节点
"""
# 检查是否使用英文名称访问中文节点
if name in self._name_mapping:
chinese_name = self._name_mapping[name]
if chinese_name in self._node_registry:
node = self._node_registry[chinese_name]
logger.debug(f"使用节点: '{name}' -> '{chinese_name}', NodeId: {node.node_id}")
return node
elif chinese_name in self._variables_to_find:
logger.warning(f"节点 {chinese_name} (英文名: {name}) 尚未找到,尝试重新查找")
if self.client:
self._find_nodes()
if chinese_name in self._node_registry:
node = self._node_registry[chinese_name]
logger.info(f"重新查找成功: '{chinese_name}', NodeId: {node.node_id}")
return node
raise ValueError(f'节点 {chinese_name} (英文名: {name}) 未注册或未找到')
# 直接使用原始名称查找
if name not in self._node_registry:
if name in self._variables_to_find:
logger.warning(f"节点 {name} 尚未找到,尝试重新查找")
if self.client:
self._find_nodes()
if name in self._node_registry:
node = self._node_registry[name]
logger.info(f"重新查找成功: '{name}', NodeId: {node.node_id}")
return node
logger.error(f"❌ 节点 '{name}' 未注册或未找到。已注册节点: {list(self._node_registry.keys())[:5]}...")
raise ValueError(f'节点 {name} 未注册或未找到')
node = self._node_registry[name]
logger.debug(f"使用节点: '{name}', NodeId: {node.node_id}")
return node
def get_node_registry(self) -> Dict[str, OpcUaNodeBase]:
return self._node_registry
def register_node_list_from_csv_path(self, path: str = None) -> "BaseClient":
"""从CSV文件注册节点"""
if path is None:
path = self.DEFAULT_ADDRESS_PATH
nodes, name_mapping, reverse_mapping = self.load_csv(path)
self._name_mapping.update(name_mapping)
self._reverse_mapping.update(reverse_mapping)
return self.register_node_list(nodes)
def register_node_list(self, node_list: List[OpcUaNode]) -> "BaseClient":
"""注册节点列表"""
if not node_list or len(node_list) == 0:
logger.warning('节点列表为空')
return self
logger.info(f'开始注册 {len(node_list)} 个节点...')
new_nodes_count = 0
for node in node_list:
if node is None:
continue
if node.name in self._node_registry:
logger.debug(f'节点 "{node.name}" 已存在于注册表')
exist = self._node_registry[node.name]
if exist.type != node.node_type:
raise ValueError(f'节点 {node.name} 类型 {node.node_type} 与已存在的类型 {exist.type} 不一致')
continue
# 将节点添加到待查找列表
self._variables_to_find[node.name] = {
"node_type": node.node_type,
"data_type": node.data_type
}
new_nodes_count += 1
logger.debug(f'添加节点 "{node.name}" ({node.node_type}) 到待查找列表')
logger.info(f'节点注册完成:新增 {new_nodes_count} 个待查找节点,总计 {len(self._variables_to_find)}')
# 如果客户端已连接,立即开始查找
if self.client:
self._find_nodes()
return self
def run_opcua_workflow(self, workflow: OpcUaWorkflow) -> None:
if not self.client:
raise ValueError('client is not connected')
logger.info(f'start to run workflow {workflow.name}...')
for action in workflow.actions:
if isinstance(action, OpcUaWorkflow):
self.run_opcua_workflow(action)
elif callable(action):
action(self.use_node)
else:
raise ValueError(f'invalid action {action}')
def call_lifecycle_fn(
self,
workflow: OpcUaWorkflowModel,
fn: Optional[Callable[[Callable], bool]],
) -> bool:
if not fn:
raise ValueError('fn is not valid in call_lifecycle_fn')
try:
result = fn(self.use_node)
# 处理函数返回值可能是元组的情况
if isinstance(result, tuple) and len(result) == 2:
# 第二个元素是错误标志True表示出错False表示成功
value, error_flag = result
return not error_flag # 转换成True表示成功False表示失败
return result
except Exception as e:
traceback.print_exc()
logger.error(f'execute {workflow.name} lifecycle failed, err: {e}')
return False
def run_opcua_workflow_model(self, workflow: OpcUaWorkflowModel) -> bool:
if not self.client:
raise ValueError('client is not connected')
logger.info(f'start to run workflow {workflow.name}...')
for action in workflow.actions:
if isinstance(action, OpcUaWorkflowModel):
if self.run_opcua_workflow_model(action):
logger.info(f"{action.name} workflow done.")
continue
else:
logger.error(f"{action.name} workflow failed")
return False
elif isinstance(action, WorkflowAction):
init = action.init
start = action.start
stop = action.stop
cleanup = action.cleanup
if not init and not start and not stop:
raise ValueError(f'invalid action {action}')
is_err = False
try:
if init and not self.call_lifecycle_fn(workflow, init):
raise ValueError(f"{workflow.name} init action failed")
if not self.call_lifecycle_fn(workflow, start):
raise ValueError(f"{workflow.name} start action failed")
if not self.call_lifecycle_fn(workflow, stop):
raise ValueError(f"{workflow.name} stop action failed")
logger.info(f"{workflow.name} action done.")
except Exception as e:
is_err = True
traceback.print_exc()
logger.error(f"{workflow.name} action failed, err: {e}")
finally:
logger.info(f"{workflow.name} try to run cleanup")
if cleanup:
self.call_lifecycle_fn(workflow, cleanup)
else:
logger.info(f"{workflow.name} cleanup is not defined")
if is_err:
return False
return True
else:
raise ValueError(f'invalid action type {type(action)}')
return True
function_name: Dict[str, Callable[[Callable[[str], OpcUaNodeBase]], bool]] = {}
def create_node_function(self, func_name: str = None, node_name: str = None, mode: str = None, value: Any = None, **kwargs) -> Callable[[Callable[[str], OpcUaNodeBase]], bool]:
def execute_node_function(use_node: Callable[[str], OpcUaNodeBase]) -> Union[bool, Tuple[Any, bool]]:
target_node = use_node(node_name)
# 检查是否有对应的参数值可用
current_value = value
if hasattr(self, '_workflow_params') and func_name in self._workflow_params:
current_value = self._workflow_params[func_name]
print(f"使用参数值 {func_name} = {current_value}")
else:
print(f"执行 {node_name}, {type(target_node).__name__}, {target_node.node_id}, {mode}, {current_value}")
if mode == 'read':
result_str = self.read_node(node_name)
try:
# 将字符串转换为字典
result_str = result_str.replace("'", '"') # 替换单引号为双引号以便JSON解析
result_dict = json.loads(result_str)
# 从字典获取值和错误标志
val = result_dict.get("value")
err = result_dict.get("error")
print(f"读取 {node_name} 返回值 = {val} (类型: {type(val).__name__}, 错误 = {err}")
return val, err
except Exception as e:
print(f"解析读取结果失败: {e}, 原始结果: {result_str}")
return None, True
elif mode == 'write':
# 构造完整的JSON输入包含node_name和value
input_json = json.dumps({"node_name": node_name, "value": current_value})
result_str = self.write_node(input_json)
try:
# 解析返回的字符串为字典
result_str = result_str.replace("'", '"') # 替换单引号为双引号以便JSON解析
result = json.loads(result_str)
success = result.get("success", False)
print(f"写入 {node_name} = {current_value}, 结果 = {success}")
return success
except Exception as e:
print(f"解析写入结果失败: {e}, 原始结果: {result_str}")
return False
elif mode == 'call' and hasattr(target_node, 'call'):
args = current_value if isinstance(current_value, list) else [current_value]
result = target_node.call(*args)
print(f"调用方法 {node_name} 参数 = {args}, 返回值 = {result}")
return result
return False
if func_name is None:
func_name = f"{node_name}_{mode}_{str(value)}"
print(f"创建 node function: {mode}, {func_name}")
self.function_name[func_name] = execute_node_function
return execute_node_function
def create_init_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None):
"""
创建初始化函数
参数:
func_name: 函数名称
write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
值可以是具体值也可以是参数名称字符串将从_workflow_params中查找
"""
if write_nodes is None:
raise ValueError("必须提供write_nodes参数")
def execute_init_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
"""根据 _workflow_params 为各节点写入真实数值。
约定:
- write_nodes 为 list 时: 节点名 == 参数名,从 _workflow_params[node_name] 取值;
- write_nodes 为 dict 时:
* value 为字符串且在 _workflow_params 中: 当作参数名去取值;
* 否则 value 视为常量直接写入。
"""
params = getattr(self, "_workflow_params", {}) or {}
if isinstance(write_nodes, list):
# 节点列表形式: 节点名与参数名一致
for node_name in write_nodes:
if node_name not in params:
print(f"初始化函数: 参数中未找到 {node_name}, 跳过写入")
continue
current_value = params[node_name]
print(f"初始化函数: 写入节点 {node_name} = {current_value}")
input_json = json.dumps({"node_name": node_name, "value": current_value})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"初始化函数: 写入结果 = {success}")
except Exception as e:
print(f"初始化函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
elif isinstance(write_nodes, dict):
# 映射形式: 节点名 -> 参数名或常量
for node_name, node_value in write_nodes.items():
if isinstance(node_value, str) and node_value in params:
current_value = params[node_value]
print(f"初始化函数: 从参数获取值 {node_value} = {current_value}")
else:
current_value = node_value
print(f"初始化函数: 使用常量值 写入 {node_name} = {current_value}")
print(f"初始化函数: 写入节点 {node_name} = {current_value}")
input_json = json.dumps({"node_name": node_name, "value": current_value})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"初始化函数: 写入结果 = {success}")
except Exception as e:
print(f"初始化函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
return True
if func_name is None:
func_name = f"init_function_{str(time.time())}"
print(f"创建初始化函数: {func_name}")
self.function_name[func_name] = execute_init_function
return execute_init_function
def create_stop_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None):
"""
创建停止函数
参数:
func_name: 函数名称
write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
"""
if write_nodes is None:
raise ValueError("必须提供write_nodes参数")
def execute_stop_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
if isinstance(write_nodes, list):
# 处理节点列表默认值都是False
for node_name in write_nodes:
# 直接写入False
print(f"停止函数: 写入节点 {node_name} = False")
input_json = json.dumps({"node_name": node_name, "value": False})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"停止函数: 写入结果 = {success}")
except Exception as e:
print(f"停止函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
elif isinstance(write_nodes, dict):
# 处理节点字典,使用指定的值
for node_name, node_value in write_nodes.items():
print(f"停止函数: 写入节点 {node_name} = {node_value}")
input_json = json.dumps({"node_name": node_name, "value": node_value})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"停止函数: 写入结果 = {success}")
except Exception as e:
print(f"停止函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
return True
if func_name is None:
func_name = f"stop_function_{str(time.time())}"
print(f"创建停止函数: {func_name}")
self.function_name[func_name] = execute_stop_function
return execute_stop_function
def create_cleanup_function(self, func_name: str = None, write_nodes: Union[Dict[str, Any], List[str]] = None):
"""
创建清理函数
参数:
func_name: 函数名称
write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
"""
if write_nodes is None:
raise ValueError("必须提供write_nodes参数")
def execute_cleanup_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
if isinstance(write_nodes, list):
# 处理节点列表默认值都是False
for node_name in write_nodes:
# 直接写入False
print(f"清理函数: 写入节点 {node_name} = False")
input_json = json.dumps({"node_name": node_name, "value": False})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"清理函数: 写入结果 = {success}")
except Exception as e:
print(f"清理函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
elif isinstance(write_nodes, dict):
# 处理节点字典,使用指定的值
for node_name, node_value in write_nodes.items():
print(f"清理函数: 写入节点 {node_name} = {node_value}")
input_json = json.dumps({"node_name": node_name, "value": node_value})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"清理函数: 写入结果 = {success}")
except Exception as e:
print(f"清理函数: 解析写入结果失败: {e}, 原始结果: {result_str}")
return True
if func_name is None:
func_name = f"cleanup_function_{str(time.time())}"
print(f"创建清理函数: {func_name}")
self.function_name[func_name] = execute_cleanup_function
return execute_cleanup_function
def create_start_function(self, func_name: str, stop_condition_expression: str = "True", write_nodes: Union[Dict[str, Any], List[str]] = None, condition_nodes: Union[Dict[str, str], List[str]] = None):
"""
创建开始函数
参数:
func_name: 函数名称
stop_condition_expression: 停止条件表达式,可直接引用节点名称
write_nodes: 写节点配置,可以是节点名列表[节点1,节点2]或节点值映射{节点1:值1,节点2:值2}
condition_nodes: 条件节点列表 [节点名1, 节点名2]
"""
def execute_start_function(use_node: Callable[[str], OpcUaNodeBase]) -> bool:
"""开始函数: 写入触发节点, 然后轮询条件节点直到满足停止条件。"""
params = getattr(self, "_workflow_params", {}) or {}
# 先处理写入节点(触发位等)
if write_nodes:
if isinstance(write_nodes, list):
# 列表形式: 节点名与参数名一致, 若无参数则直接写 True
for node_name in write_nodes:
if node_name in params:
current_value = params[node_name]
else:
current_value = True
print(f"直接写入节点 {node_name} = {current_value}")
input_json = json.dumps({"node_name": node_name, "value": current_value})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"直接写入 {node_name} = {current_value}, 结果: {success}")
except Exception as e:
print(f"解析直接写入结果失败: {e}, 原始结果: {result_str}")
elif isinstance(write_nodes, dict):
# 字典形式: 节点名 -> 常量值(如 True/False)
for node_name, node_value in write_nodes.items():
if node_name in params:
current_value = params[node_name]
else:
current_value = node_value
print(f"直接写入节点 {node_name} = {current_value}")
input_json = json.dumps({"node_name": node_name, "value": current_value})
result_str = self.write_node(input_json)
try:
result_str = result_str.replace("'", '"')
result = json.loads(result_str)
success = result.get("success", False)
print(f"直接写入 {node_name} = {current_value}, 结果: {success}")
except Exception as e:
print(f"解析直接写入结果失败: {e}, 原始结果: {result_str}")
# 如果没有条件节点,立即返回
if not condition_nodes:
return True
# 处理条件检查和等待
while True:
next_loop = False
condition_source = {}
# 直接读取条件节点
if isinstance(condition_nodes, list):
# 处理节点列表
for i, node_name in enumerate(condition_nodes):
# 直接读取节点
result_str = self.read_node(node_name)
try:
time.sleep(1)
result_str = result_str.replace("'", '"')
result_dict = json.loads(result_str)
read_res = result_dict.get("value")
read_err = result_dict.get("error", False)
print(f"直接读取 {node_name} 返回值 = {read_res}, 错误 = {read_err}")
if read_err:
next_loop = True
break
# 将节点值存入条件源字典,使用节点名称作为键
condition_source[node_name] = read_res
# 为了向后兼容也保留read_i格式
condition_source[f"read_{i}"] = read_res
except Exception as e:
print(f"解析直接读取结果失败: {e}, 原始结果: {result_str}")
read_res, read_err = None, True
next_loop = True
break
elif isinstance(condition_nodes, dict):
# 处理节点字典
for condition_func, node_name in condition_nodes.items():
# 直接读取节点
result_str = self.read_node(node_name)
try:
result_str = result_str.replace("'", '"')
result_dict = json.loads(result_str)
read_res = result_dict.get("value")
read_err = result_dict.get("error", False)
print(f"直接读取 {node_name} 返回值 = {read_res}, 错误 = {read_err}")
if read_err:
next_loop = True
break
# 将节点值存入条件源字典
condition_source[node_name] = read_res
# 也保存使用函数名作为键
condition_source[condition_func] = read_res
except Exception as e:
print(f"解析直接读取结果失败: {e}, 原始结果: {result_str}")
next_loop = True
break
if not next_loop:
if stop_condition_expression:
# 添加调试信息
print(f"条件源数据: {condition_source}")
condition_source["__RESULT"] = None
# 确保安全地执行条件表达式
try:
# 先尝试使用eval更安全的方式计算表达式
result = eval(stop_condition_expression, {}, condition_source)
condition_source["__RESULT"] = result
except Exception as e:
print(f"使用eval执行表达式失败: {e}")
try:
# 回退到exec方式
exec(f"__RESULT = {stop_condition_expression}", {}, condition_source)
except Exception as e2:
print(f"使用exec执行表达式也失败: {e2}")
condition_source["__RESULT"] = False
res = condition_source["__RESULT"]
print(f"取得计算结果: {res}, 条件表达式: {stop_condition_expression}")
if res:
print("满足停止条件,结束工作流")
break
else:
# 如果没有停止条件,直接退出
break
else:
time.sleep(0.3)
return True
self.function_name[func_name] = execute_start_function
return execute_start_function
create_action_from_json = None
def create_action_from_json(self, data: Union[Dict, Any]) -> WorkflowAction:
"""
从JSON配置创建工作流动作
参数:
data: 动作JSON数据
返回:
WorkflowAction对象
"""
# 初始化所需变量
start_function = None
write_nodes = {}
condition_nodes = []
stop_function = None
init_function = None
cleanup_function = None
# 提取start_function相关信息
if hasattr(data, "start_function") and data.start_function:
start_function = data.start_function
if "write_nodes" in start_function:
write_nodes = start_function["write_nodes"]
if "condition_nodes" in start_function:
condition_nodes = start_function["condition_nodes"]
elif isinstance(data, dict) and data.get("start_function"):
start_function = data.get("start_function")
if "write_nodes" in start_function:
write_nodes = start_function["write_nodes"]
if "condition_nodes" in start_function:
condition_nodes = start_function["condition_nodes"]
# 提取stop_function信息
if hasattr(data, "stop_function") and data.stop_function:
stop_function = data.stop_function
elif isinstance(data, dict) and data.get("stop_function"):
stop_function = data.get("stop_function")
# 提取init_function信息
if hasattr(data, "init_function") and data.init_function:
init_function = data.init_function
elif isinstance(data, dict) and data.get("init_function"):
init_function = data.get("init_function")
# 提取cleanup_function信息
if hasattr(data, "cleanup_function") and data.cleanup_function:
cleanup_function = data.cleanup_function
elif isinstance(data, dict) and data.get("cleanup_function"):
cleanup_function = data.get("cleanup_function")
# 创建工作流动作组件
init = None
start = None
stop = None
cleanup = None
# 处理init function
if init_function:
init_params = {"func_name": init_function.get("func_name")}
if "write_nodes" in init_function:
init_params["write_nodes"] = init_function["write_nodes"]
else:
# 如果没有write_nodes创建一个空字典
init_params["write_nodes"] = {}
init = self.create_init_function(**init_params)
# 处理start function
if start_function:
start_params = {
"func_name": start_function.get("func_name"),
"stop_condition_expression": start_function.get("stop_condition_expression", "True"),
"write_nodes": write_nodes,
"condition_nodes": condition_nodes
}
start = self.create_start_function(**start_params)
# 处理stop function
if stop_function:
stop_params = {
"func_name": stop_function.get("func_name"),
"write_nodes": stop_function.get("write_nodes", {})
}
stop = self.create_stop_function(**stop_params)
# 处理cleanup function
if cleanup_function:
cleanup_params = {
"func_name": cleanup_function.get("func_name"),
"write_nodes": cleanup_function.get("write_nodes", {})
}
cleanup = self.create_cleanup_function(**cleanup_params)
return WorkflowAction(init=init, start=start, stop=stop, cleanup=cleanup)
workflow_name: Dict[str, OpcUaWorkflowModel] = {}
def create_workflow_from_json(self, data: List[Dict]) -> None:
"""
从JSON配置创建工作流程序
参数:
data: 工作流配置列表
"""
for ind, flow_dict in enumerate(data):
print(f"正在创建 workflow {ind}, {flow_dict['name']}")
actions = []
for i in flow_dict["action"]:
if isinstance(i, str):
print(f"沿用已有 workflow 作为 action: {i}")
action = self.workflow_name[i]
else:
print("创建 action")
# 直接将字典转换为SimplifiedActionJson对象或直接使用字典
action = self.create_action_from_json(i)
actions.append(action)
# 获取参数
parameters = flow_dict.get("parameters", [])
flow_instance = OpcUaWorkflowModel(
name=flow_dict["name"],
actions=actions,
parameters=parameters,
description=flow_dict.get("description", "")
)
print(f"创建完成 workflow: {flow_dict['name']}")
self.workflow_name[flow_dict["name"]] = flow_instance
def execute_workflow_from_json(self, data: List[str]) -> None:
for i in data:
print(f"正在执行 workflow: {i}")
self.run_opcua_workflow_model(self.workflow_name[i])
def execute_procedure_from_json(self, data: Union[ExecuteProcedureJson, Dict]) -> None:
"""从JSON配置执行工作流程序"""
if isinstance(data, dict):
# 处理字典类型
register_params = data.get("register_node_list_from_csv_path")
create_flow = data.get("create_flow", [])
execute_flow = data.get("execute_flow", [])
else:
# 处理Pydantic模型类型
register_params = data.register_node_list_from_csv_path
create_flow = data.create_flow
execute_flow = data.execute_flow if hasattr(data, "execute_flow") else []
# 注册节点
if register_params:
print(f"注册节点 csv: {register_params}")
self.register_node_list_from_csv_path(**register_params)
# 创建工作流
print("创建工作流")
self.create_workflow_from_json(create_flow)
# 注册工作流为实例方法
self.register_workflows_as_methods()
# 如果存在execute_flow字段则执行指定的工作流向后兼容
if execute_flow:
print("执行工作流")
self.execute_workflow_from_json(execute_flow)
def register_workflows_as_methods(self) -> None:
"""将工作流注册为实例方法"""
for workflow_name, workflow in self.workflow_name.items():
# 获取工作流的参数信息(如果存在)
workflow_params = getattr(workflow, 'parameters', []) or []
workflow_desc = getattr(workflow, 'description', None) or f"执行工作流: {workflow_name}"
# 创建执行工作流的方法
def create_workflow_method(wf_name=workflow_name, wf=workflow, params=workflow_params):
def workflow_method(*args, **kwargs):
logger.info(f"执行工作流: {wf_name}, 参数: {args}, {kwargs}")
# 处理传入的参数
if params and (args or kwargs):
# 将位置参数转换为关键字参数
params_dict = {}
for i, param_name in enumerate(params):
if i < len(args):
params_dict[param_name] = args[i]
# 合并关键字参数
params_dict.update(kwargs)
# 保存参数,供节点函数使用
self._workflow_params = params_dict
else:
self._workflow_params = {}
# 执行工作流
result = self.run_opcua_workflow_model(wf)
# 清理参数
self._workflow_params = {}
return result
# 设置方法的文档字符串
workflow_method.__doc__ = workflow_desc
if params:
param_doc = ", ".join(params)
workflow_method.__doc__ += f"\n参数: {param_doc}"
return workflow_method
# 注册为实例方法
method = create_workflow_method()
setattr(self, workflow_name, method)
logger.info(f"已将工作流 '{workflow_name}' 注册为实例方法")
def read_node(self, node_name: str) -> Dict[str, Any]:
"""
读取节点值的便捷方法
返回包含result字段的字典
"""
# 使用锁保护客户端访问
with self._client_lock:
try:
node = self.use_node(node_name)
value, error = node.read()
# 创建结果字典
result = {
"value": value,
"error": error,
"node_name": node_name,
"timestamp": time.time()
}
# 返回JSON字符串
return json.dumps(result)
except Exception as e:
logger.error(f"读取节点 {node_name} 失败: {e}")
# 创建错误结果字典
result = {
"value": None,
"error": True,
"node_name": node_name,
"error_message": str(e),
"timestamp": time.time()
}
return json.dumps(result)
def write_node(self, json_input: str) -> str:
"""
写入节点值的便捷方法
接受单个JSON格式的字符串作为输入包含节点名称和值
eg:'{\"node_name\":\"反应罐号码\",\"value\":\"2\"}'
返回JSON格式的字符串包含操作结果
"""
# 使用锁保护客户端访问
with self._client_lock:
try:
# 解析JSON格式的输入
if not isinstance(json_input, str):
json_input = str(json_input)
try:
input_data = json.loads(json_input)
if not isinstance(input_data, dict):
return json.dumps({"error": True, "error_message": "输入必须是包含node_name和value的JSON对象", "success": False})
# 从JSON中提取节点名称和值
node_name = input_data.get("node_name")
value = input_data.get("value")
if node_name is None:
return json.dumps({"error": True, "error_message": "JSON中缺少node_name字段", "success": False})
except json.JSONDecodeError as e:
return json.dumps({"error": True, "error_message": f"JSON解析错误: {str(e)}", "success": False})
node = self.use_node(node_name)
error = node.write(value)
# 创建结果字典
result = {
"value": value,
"error": error,
"node_name": node_name,
"timestamp": time.time(),
"success": not error
}
return json.dumps(result)
except Exception as e:
logger.error(f"写入节点失败: {e}")
result = {
"error": True,
"error_message": str(e),
"timestamp": time.time(),
"success": False
}
return json.dumps(result)
def call_method(self, node_name: str, *args) -> Tuple[Any, bool]:
"""
调用方法节点的便捷方法
返回 (返回值, 是否出错)
"""
try:
node = self.use_node(node_name)
if hasattr(node, 'call'):
return node.call(*args)
else:
logger.error(f"节点 {node_name} 不是方法节点")
return None, True
except Exception as e:
logger.error(f"调用方法 {node_name} 失败: {e}")
return None, True
class OpcUaClient(BaseClient):
def __init__(
self,
url: str,
deck: Optional[Union[post_process_deck, Dict[str, Any]]] = None,
config_path: str = None,
username: str = None,
password: str = None,
use_subscription: bool = True,
cache_timeout: float = 5.0,
subscription_interval: int = 500,
*args,
**kwargs,
):
# 降低OPCUA库的日志级别
import logging
logging.getLogger("opcua").setLevel(logging.WARNING)
super().__init__()
# ===== 关键修改:参照 BioyondWorkstation 处理 deck =====
super().__init__()
# 处理 deck 参数
if deck is None:
self.deck = post_process_deck(setup=True)
elif isinstance(deck, dict):
self.deck = post_process_deck(setup=True)
elif hasattr(deck, 'children'):
self.deck = deck
else:
raise ValueError(f"deck 参数类型不支持: {type(deck)}")
if self.deck is None:
raise ValueError("Deck 配置不能为空")
# 统计仓库信息
warehouse_count = 0
if hasattr(self.deck, 'children'):
warehouse_count = len(self.deck.children)
logger.info(f"Deck 初始化完成,加载 {warehouse_count} 个资源")
# OPC UA 客户端初始化
client = Client(url)
if username and password:
client.set_user(username)
client.set_password(password)
self._set_client(client)
# 订阅相关属性
self._use_subscription = use_subscription
self._subscription = None
self._subscription_handles = {}
self._subscription_interval = subscription_interval
# 缓存相关属性
self._node_values = {} # 修改为支持时间戳的缓存结构
self._cache_timeout = cache_timeout
# 连接状态监控
self._connection_check_interval = 30.0 # 连接检查间隔(秒)
self._connection_monitor_running = False
self._connection_monitor_thread = None
# 添加线程锁保护OPC UA客户端的并发访问
import threading
self._client_lock = threading.RLock()
# 连接到服务器
self._connect()
# 如果提供了配置文件路径,则加载配置并注册工作流
if config_path:
self.load_config(config_path)
# 启动连接监控
self._start_connection_monitor()
def _connect(self) -> None:
"""连接到OPC UA服务器"""
logger.info('尝试连接到 OPC UA 服务器...')
if self.client:
try:
self.client.connect()
logger.info('✓ 客户端已连接!')
# 连接后开始查找节点
if self._variables_to_find:
self._find_nodes()
# 如果启用订阅模式,设置订阅
if self._use_subscription:
self._setup_subscriptions()
else:
logger.info("订阅模式已禁用,将使用按需读取模式")
except Exception as e:
logger.error(f'客户端连接失败: {e}')
raise
else:
raise ValueError('客户端未初始化')
class SubscriptionHandler:
"""freeopcua订阅处理器必须实现 datachange_notification 方法"""
def __init__(self, outer):
self.outer = outer
def datachange_notification(self, node, val, data):
# 委托给外层类的处理函数
try:
self.outer._on_subscription_datachange(node, val, data)
except Exception as e:
logger.error(f"订阅数据回调处理失败: {e}")
# 可选:事件通知占位,避免库调用时报缺失
def event_notification(self, event):
pass
def _setup_subscriptions(self):
"""设置 OPC UA 订阅"""
if not self.client or not self._use_subscription:
return
with self._client_lock:
try:
logger.info(f"开始设置订阅 (发布间隔: {self._subscription_interval}ms)...")
# 创建订阅
handler = OpcUaClient.SubscriptionHandler(self)
self._subscription = self.client.create_subscription(
self._subscription_interval,
handler
)
# 为所有变量节点创建监控项
subscribed_count = 0
skipped_count = 0
for node_name, node in self._node_registry.items():
# 只为变量节点创建订阅
if node.type == NodeType.VARIABLE and node.node_id:
try:
# 优先使用在查找阶段缓存的真实 ua.Node 对象
ua_node = self._found_node_objects.get(node_name)
if ua_node is None:
ua_node = self.client.get_node(node.node_id)
handle = self._subscription.subscribe_data_change(ua_node)
self._subscription_handles[node_name] = handle
subscribed_count += 1
logger.debug(f"✓ 已订阅节点: {node_name}")
except Exception as e:
skipped_count += 1
logger.warning(f"✗ 订阅节点 {node_name} 失败: {e}")
else:
skipped_count += 1
logger.info(f"订阅设置完成: 成功 {subscribed_count} 个, 跳过 {skipped_count}")
except Exception as e:
logger.error(f"设置订阅失败: {e}")
traceback.print_exc()
# 订阅失败时回退到按需读取模式
self._use_subscription = False
logger.warning("订阅模式设置失败,已自动切换到按需读取模式")
def _on_subscription_datachange(self, node, val, data):
"""订阅数据变化处理器(供内部 SubscriptionHandler 调用)"""
try:
node_id = str(node.nodeid)
current_time = time.time()
# 查找对应的节点名称
for node_name, node_obj in self._node_registry.items():
if node_obj.node_id == node_id:
self._node_values[node_name] = {
'value': val,
'timestamp': current_time,
'source': 'subscription'
}
logger.debug(f"订阅更新: {node_name} = {val}")
break
except Exception as e:
logger.error(f"处理订阅数据失败: {e}")
def get_node_value(self, name, use_cache=True, force_read=False):
"""
获取节点值(智能缓存版本)
参数:
name: 节点名称(支持中文名或英文名)
use_cache: 是否使用缓存
force_read: 是否强制从服务器读取(忽略缓存)
"""
# 处理名称映射
if name in self._name_mapping:
chinese_name = self._name_mapping[name]
elif name in self._node_registry:
chinese_name = name
else:
raise ValueError(f"未找到名称为 '{name}' 的节点")
# 如果强制读取,直接从服务器读取
if force_read:
with self._client_lock:
value, _ = self.use_node(chinese_name).read()
# 更新缓存
self._node_values[chinese_name] = {
'value': value,
'timestamp': time.time(),
'source': 'forced_read'
}
return value
# 检查缓存
if use_cache and chinese_name in self._node_values:
cache_entry = self._node_values[chinese_name]
cache_age = time.time() - cache_entry['timestamp']
# 如果是订阅模式,缓存永久有效(由订阅更新)
# 如果是按需读取模式,检查缓存超时
if cache_entry.get('source') == 'subscription' or cache_age < self._cache_timeout:
logger.debug(f"从缓存读取: {chinese_name} = {cache_entry['value']} (age: {cache_age:.2f}s, source: {cache_entry.get('source', 'unknown')})")
return cache_entry['value']
# 缓存过期或不存在,从服务器读取
with self._client_lock:
try:
value, error = self.use_node(chinese_name).read()
if not error:
# 更新缓存
self._node_values[chinese_name] = {
'value': value,
'timestamp': time.time(),
'source': 'on_demand_read'
}
return value
else:
logger.warning(f"读取节点 {chinese_name} 失败")
return None
except Exception as e:
logger.error(f"读取节点 {chinese_name} 出错: {e}")
return None
def set_node_value(self, name, value):
"""
设置节点值
写入成功后会立即更新本地缓存
"""
# 处理名称映射
if name in self._name_mapping:
chinese_name = self._name_mapping[name]
elif name in self._node_registry:
chinese_name = name
else:
raise ValueError(f"未找到名称为 '{name}' 的节点")
with self._client_lock:
try:
node = self.use_node(chinese_name)
error = node.write(value)
if not error:
# 写入成功,立即更新缓存
self._node_values[chinese_name] = {
'value': value,
'timestamp': time.time(),
'source': 'write'
}
logger.debug(f"写入成功: {chinese_name} = {value}")
return True
else:
logger.warning(f"写入节点 {chinese_name} 失败")
return False
except Exception as e:
logger.error(f"写入节点 {chinese_name} 出错: {e}")
return False
def _check_connection(self) -> bool:
"""检查连接状态"""
try:
with self._client_lock:
if self.client:
# 尝试获取命名空间数组来验证连接
self.client.get_namespace_array()
return True
except Exception as e:
logger.warning(f"连接检查失败: {e}")
return False
return False
def _connection_monitor_worker(self):
"""连接监控线程工作函数"""
self._connection_monitor_running = True
logger.info(f"连接监控线程已启动 (检查间隔: {self._connection_check_interval}秒)")
reconnect_attempts = 0
max_reconnect_attempts = 5
while self._connection_monitor_running:
try:
# 检查连接状态
if not self._check_connection():
logger.warning("检测到连接断开,尝试重新连接...")
reconnect_attempts += 1
if reconnect_attempts <= max_reconnect_attempts:
try:
# 尝试重新连接
with self._client_lock:
if self.client:
try:
self.client.disconnect()
except:
pass
self.client.connect()
logger.info("✓ 重新连接成功")
# 重新设置订阅
if self._use_subscription:
self._setup_subscriptions()
reconnect_attempts = 0
except Exception as e:
logger.error(f"重新连接失败 (尝试 {reconnect_attempts}/{max_reconnect_attempts}): {e}")
time.sleep(5) # 重连失败后等待5秒
else:
logger.error(f"达到最大重连次数 ({max_reconnect_attempts}),停止重连")
self._connection_monitor_running = False
else:
# 连接正常,重置重连计数
reconnect_attempts = 0
except Exception as e:
logger.error(f"连接监控出错: {e}")
# 等待下次检查
time.sleep(self._connection_check_interval)
def _start_connection_monitor(self):
"""启动连接监控线程"""
if self._connection_monitor_thread is not None and self._connection_monitor_thread.is_alive():
logger.warning("连接监控线程已在运行")
return
import threading
self._connection_monitor_thread = threading.Thread(
target=self._connection_monitor_worker,
daemon=True,
name="OpcUaConnectionMonitor"
)
self._connection_monitor_thread.start()
def _stop_connection_monitor(self):
"""停止连接监控线程"""
self._connection_monitor_running = False
if self._connection_monitor_thread and self._connection_monitor_thread.is_alive():
self._connection_monitor_thread.join(timeout=2.0)
logger.info("连接监控线程已停止")
def read_node(self, node_name: str) -> str:
"""
读取节点值的便捷方法(使用缓存)
返回JSON格式字符串
"""
try:
# 使用get_node_value方法自动处理缓存
value = self.get_node_value(node_name, use_cache=True)
# 获取缓存信息
chinese_name = self._name_mapping.get(node_name, node_name)
cache_info = self._node_values.get(chinese_name, {})
result = {
"value": value,
"error": False,
"node_name": node_name,
"timestamp": time.time(),
"cache_age": time.time() - cache_info.get('timestamp', time.time()),
"source": cache_info.get('source', 'unknown')
}
return json.dumps(result)
except Exception as e:
logger.error(f"读取节点 {node_name} 失败: {e}")
result = {
"value": None,
"error": True,
"node_name": node_name,
"error_message": str(e),
"timestamp": time.time()
}
return json.dumps(result)
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
current_time = time.time()
stats = {
'total_cached_nodes': len(self._node_values),
'subscription_nodes': 0,
'on_demand_nodes': 0,
'expired_nodes': 0,
'cache_timeout': self._cache_timeout,
'using_subscription': self._use_subscription
}
for node_name, cache_entry in self._node_values.items():
source = cache_entry.get('source', 'unknown')
cache_age = current_time - cache_entry['timestamp']
if source == 'subscription':
stats['subscription_nodes'] += 1
elif source in ['on_demand_read', 'forced_read', 'write']:
stats['on_demand_nodes'] += 1
if cache_age > self._cache_timeout:
stats['expired_nodes'] += 1
return stats
def print_cache_stats(self):
"""打印缓存统计信息"""
stats = self.get_cache_stats()
print("\n" + "="*80)
print("缓存统计信息")
print("="*80)
print(f"总缓存节点数: {stats['total_cached_nodes']}")
print(f"订阅模式: {'启用' if stats['using_subscription'] else '禁用'}")
print(f" - 订阅更新节点: {stats['subscription_nodes']}")
print(f" - 按需读取节点: {stats['on_demand_nodes']}")
print(f" - 已过期节点: {stats['expired_nodes']}")
print(f"缓存超时时间: {stats['cache_timeout']}")
print("="*80 + "\n")
def load_config(self, config_path: str) -> None:
"""从JSON配置文件加载并注册工作流"""
try:
with open(config_path, 'r', encoding='utf-8') as f:
config_data = json.load(f)
# 处理节点注册
if "register_node_list_from_csv_path" in config_data:
config_dir = os.path.dirname(os.path.abspath(config_path))
if "path" in config_data["register_node_list_from_csv_path"]:
csv_path = config_data["register_node_list_from_csv_path"]["path"]
if not os.path.isabs(csv_path):
csv_path = os.path.join(config_dir, csv_path)
config_data["register_node_list_from_csv_path"]["path"] = csv_path
self.register_node_list_from_csv_path(**config_data["register_node_list_from_csv_path"])
if self.client and self._variables_to_find:
logger.info("CSV加载完成开始查找服务器节点...")
self._find_nodes()
# 处理工作流创建
if "create_flow" in config_data:
self.create_workflow_from_json(config_data["create_flow"])
self.register_workflows_as_methods()
# 将所有节点注册为属性
self._register_nodes_as_attributes()
# 打印统计信息
found_count = len(self._node_registry)
total_count = len(self._variables_to_find)
if found_count < total_count:
logger.warning(f"节点查找完成:找到 {found_count}/{total_count} 个节点")
else:
logger.info(f"✓ 节点查找完成:所有 {found_count} 个节点均已找到")
# 如果使用订阅模式,重新设置订阅(确保新节点被订阅)
if self._use_subscription and found_count > 0:
self._setup_subscriptions()
logger.info(f"成功从 {config_path} 加载配置")
except Exception as e:
logger.error(f"加载配置文件 {config_path} 失败: {e}")
traceback.print_exc()
def disconnect(self):
"""断开连接并清理资源"""
logger.info("正在断开连接...")
# 停止连接监控
self._stop_connection_monitor()
# 删除订阅
if self._subscription:
try:
with self._client_lock:
self._subscription.delete()
logger.info("订阅已删除")
except Exception as e:
logger.warning(f"删除订阅失败: {e}")
# 断开客户端连接
if self.client:
try:
with self._client_lock:
self.client.disconnect()
logger.info("✓ OPC UA 客户端已断开连接")
except Exception as e:
logger.error(f"断开连接失败: {e}")
def _register_nodes_as_attributes(self):
"""将所有节点注册为实例属性"""
for node_name, node in self._node_registry.items():
if not node.node_id or node.node_id == "":
logger.warning(f"⚠ 节点 '{node_name}' 的 node_id 为空,跳过注册为属性")
continue
eng_name = self._reverse_mapping.get(node_name)
attr_name = eng_name if eng_name else node_name.replace(' ', '_').replace('-', '_')
def create_property_getter(node_key):
def getter(self):
return self.get_node_value(node_key, use_cache=True)
return getter
setattr(OpcUaClient, attr_name, property(create_property_getter(node_name)))
logger.debug(f"已注册节点 '{node_name}' 为属性 '{attr_name}'")
def post_init(self, ros_node):
"""ROS2 节点就绪后的初始化"""
if not (hasattr(self, 'deck') and self.deck):
return
if not (hasattr(ros_node, 'resource_tracker') and ros_node.resource_tracker):
logger.warning("resource_tracker 不存在,无法注册 deck")
return
# 1. 本地注册(必需)
ros_node.resource_tracker.add_resource(self.deck)
# 2. 上传云端
try:
from unilabos.ros.nodes.base_device_node import ROS2DeviceNode
ROS2DeviceNode.run_async_func(
ros_node.update_resource,
True,
resources=[self.deck]
)
logger.info("Deck 已上传到云端")
except Exception as e:
logger.error(f"上传失败: {e}")
if __name__ == '__main__':
# 示例用法
# 使用配置文件创建客户端并自动注册工作流
import os
current_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(current_dir, "opcua_huairou.json")
# 创建OPC UA客户端并加载配置
try:
client = OpcUaClient(
url="opc.tcp://192.168.1.88:4840/freeopcua/server/", # 替换为实际的OPC UA服务器地址
config_path="D:\\Uni-Lab-OS\\unilabos\\device_comms\\opcua_client\\opcua_huairou.json" # 传入配置文件路径
)
# 列出所有已注册的工作流
print("\n已注册的工作流:")
for workflow_name in client.workflow_name:
print(f" - {workflow_name}")
# 测试trigger_grab_action工作流 - 使用英文参数名
print("\n测试trigger_grab_action工作流 - 使用英文参数名:")
client.trigger_grab_action(reaction_tank_number=2, raw_tank_number=2)
# client.set_node_value("reaction_tank_number", 2)
# 读取节点值 - 使用英文节点名
grab_complete = client.get_node_value("grab_complete")
reaction_tank = client.get_node_value("reaction_tank_number")
raw_tank = client.get_node_value("raw_tank_number")
print(f"\n执行后状态检查 (使用英文节点名):")
print(f" - 抓取完成状态: {grab_complete}")
print(f" - 当前反应罐号码: {reaction_tank}")
print(f" - 当前原料罐号码: {raw_tank}")
# 测试节点值写入 - 使用英文节点名
print("\n测试节点值写入 (使用英文节点名):")
success = client.set_node_value("atomization_fast_speed", 150.5)
print(f" - 写入搅拌浆雾化快速 = 150.5, 结果: {success}")
# 读取写入的值
atomization_speed = client.get_node_value("atomization_fast_speed")
print(f" - 读取搅拌浆雾化快速: {atomization_speed}")
# 断开连接
client.disconnect()
except Exception as e:
print(f"错误: {e}")
traceback.print_exc()