Files
Uni-Lab-OS/unilabos/ros/msgs/message_converter.py
Junhan Chang a62a695812 Ready for open source (#47)
* Create app/main API

* create example device

* create ROS backend and example device SDK Wrapper

* Add ROS host and host starting from app.py

* Add gripper device and mock implementation

* add "status_types" & "action_types" to ROS device decorator

* add ActionServer debug example

* [bugfix] complete mock gripper example

* ROS Backend Host for Device action calling and Resource management

* add conda/mamba ENV file

* add host_node communication with app/main.py

* add action message value mappings and converters

* Update ilabos.yaml

* Update issue templates

* example devices.json and resources.json

* Fix Device wrapper to use async property and actions (#7)

* Fix Device wrapper to use async property and actions

* Resolve #1 : support async get methods and actions. Give new examples.

* add both sync/async GRBL controller SDK

* 2 call device actions from appmainpy api to ros hostpy (#8)

* feature: add job

* fix:node start

* feature:add get job status

* fix:get device

* clean

* Resolve #5 device connection diagram and workflow compilation support (#9)

* add syringe pump device and its compilation using device connection diagram

* add RunzeSyringePump real device with ROS2 example

* Prototype machine with 1 pump and 1 CNC

* add ROS2ProtocolNode and related functions

* add ilabos_msgs (to use PumpTransfer action)

* add example device connection graph

* refactor protocol_node code into separate file

* add ROS2SerialNode

* add SerialCommand srv in ilabos_msgs

* add pump_protocol example, and fix bugs

* [fix] serial service: avoid async service deadlock by directly call serial `send_command`

* use SendCmd instead of SingleJointPosition for valve control

* initialize device connection graph when server starts

* Fix #5: async workflow execution (#16)

* add rclpyx and protocol example for async-native workflow

* use async in ROS2ProtocolNode, and host initialization

* add examples of "ros-async" protocol implementation, and `run_in_event_loop` for using native async functions

* use "ros-async" in protocols and device nodes

* fix pump_protocol: push to 0.0 μL

* Envs, docs, and conda recipes (#19)

* update ENV: use python 3.11 and deprecate ros-humble-gazebo-ros

* add ilabos-msgs conda recipe

* Update ilabos.yaml

* fix recipe and env yaml

* Add sphinx docs

* add aichemeco

* add bioyong

* add bioyong

* Support XDL devices & protocols (#20)

* [Feature] support multiple protocols in a single ProtocolNode

* add Junjie's code

* Support "Clean" protocol

* Update Grignard_flow_batchreact_single_pumpvalve.json

* test_grignard_add

* add stir device node and example

* Update device_node.py

add print_publish flag to control the node's log output

* NH4Cl_add

* add "HeaterStirrer" device and "HeatChill" action

* add wait time after each pump action for equilibration

* fix stir

* add Separate protocol

* Refactor Separator device and Stir action

* add rotavap_node

* fix stir

* add chiller node

* Move rinsings into PumpTransfer

* Fix SeparateProtocol under refactored Separator device and Stir action

* Supports automatically add new protocol action_types

* fix PumpTransfer protocol because of rinsing

* Add Rotavap and Chiller devices

* fix SeparateProtocol

* add EvaporateProtocol

* add rotavap devices config

* fix HeaterStirrer and SeparatorController IO

* Fix automatically add new protocol action_types

* Add HeaterStirrer and SeparatorController device config

* fix pump protocols

* Fix Evaporate action

* Update evaporate_protocol.py

* add temp_sensor node and add function remap

* update docs

---------

Co-authored-by: 王俊杰 <1800011822@pku.edu.cn>
Co-authored-by: q434343 <554662886@qq.com>

* fix aichemeco

* [Bugfix] fix Windows conda packaging

* add file upload api

* update dependencies: force to use 3.11 and remove conflict in WIN64 and OSX64

* update dependencies: force to use 3.11 and remove conflict in WIN64 and OSX64

* Create aichemeco_simple.py

* fix

* update

* add aichemeco file

* MQTT [1/2]: action start (#25)

* add mq

* fix

* clean

* add class

* fix excel

* update bioyong

* add api

* fix

---------

Co-authored-by: caok@dp.tech <xiaoyeqiannian@163.com>

* motor & grasp

* Add Grasp motor support and enhance EleGripper class

- Introduced a new motor configuration for Grasp in sjtu.json.
- Updated EleGripper class to inherit from UniversalDriver and added status property.
- Implemented move_and_rotate method for coordinated movement.
- Adjusted threading logic in EleGripper initialization.
- Registered Grasp motor in ROS2 device node configuration.

This commit enhances the motor control capabilities by integrating the Grasp motor and improving the existing EleGripper functionality.

* fix read data lenth

* Update Grasp.py

* MQTT (2/2): publish Device Status, Action Feedback & Results (#27)

* Add bridges in HostNode for device_status publishing

* Add "bridges" selection (fastapi & mqtt) when app start

* add MQ feedback & result publisher, and fix message converter

* fix UUID converting between ROS and MQ

* lint api model.py

* Continuous controllers: PID, MPC, custom controllers etc. (#23)

* add controller config & wrapper

* add controller setup at app.main

* control loop example

* fix com port

* add agv , ur_arm and raman

* MQTT (3/4): Unified Resources and Sync when starting the server (#28)

* update http upload api

* generate uuid when init device

* example resource json

* fix

* add new example full-content json (device, resource, graph)

* fix full-content json and related reading code

* fix

* add json_schema when initialize resources

* fix

* update schema

* refactor heaterstirrer.dalong

* fix

* fix refactor heaterstirrer.dalong

* refactor syringepump.runze: use ml instead of μL

* Update ilabos/ros/host.py

Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>

---------

Co-authored-by: 王俊杰 <1800011822@pku.edu.cn>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>

* Distributed initialization with self-organizing network (#29)

* add distributed launching option "--without_host"

* fix

---------

Co-authored-by: 王俊杰 <1800011822@pku.edu.cn>

* Refactor Workstation: Add resource service and tracking (#30)

* move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices

* add Resource srv and message_converter

* move graphio to ilabos/resources

* refactor resources type conversion

* add resource clients in device_node

* add mock resources service

* pass Gripper1 resource test

* update http resource services

* add AGV compile function

* add AGV transfer protocol

* update full mock_gripper edit_id example

* update full mock_gripper edit_id example

* get and update resource also in protocol_node

* mock resource update in AichemecoHiwo

* Create HT_hiwo.json

* add children in resources

* bugfixes

* fix rpc

* add Revvity winprep

---------

Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com>
Co-authored-by: 3218923350 <105201755+3218923350@users.noreply.github.com>

* Distributed launch (2/2): distributed resource create (#32)

* add resource_add request to host for slave mode

* add AGV

* fix protocol resources

* optimize host callbacks

* bugfixes

* add revvity registry

---------

Co-authored-by: 王俊杰 <1800011822@pku.edu.cn>
Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com>

* Refactor Driver Files Structure (#33)

* Integration with pywinauto & recorder
Added execute run and initialize procdure

* 酶标仪状态检测、使用示例,整体流程待测试

* nivo ready version

* Add HPLC driver and example script

- Introduced HPLCDriver class for managing HPLC device status and operations.
- Implemented device status monitoring and command execution via ROS2 actions.
- Added example script (hplc.py) demonstrating how to run commands on the HPLC device.
- Created PlayerUtil and UniversalDriver classes for shared functionality across devices.
- Refactored NivoDriver to utilize the new UniversalDriver structure.
- Enhanced error handling and process management in the NivoDriver.

* 修复start的错误定位

* hplc tested ok

* relative path to build msgs

* template_driver & jiageng devices

* fetch correct status type and action type

* fix mtype fetch

* gpc bus integration

* ilab build

* remove chs

* recipe rename

* modbus update 1

* json available

* hplc & modbus rewrite

* Update AgilentHPLC.py

hplc datafile reader

* move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices

* modbus分设备

* gpc

* gpc 2

* fix address

* default register node

* fix MainScreenGPC

* add Resource srv and message_converter

* move graphio to ilabos/resources

* refactor resources type conversion

* add resource clients in device_node

* add mock resources service

* pass Gripper1 resource test

* update http resource services

* add AGV compile function

* add AGV transfer protocol

* update recipe.yaml

* update full mock_gripper edit_id example

* update full mock_gripper edit_id example

* get and update resource also in protocol_node

* mock resource update in AichemecoHiwo

* feat: add other jiageng PLC device code

* ilabos compile

* correct format

* correct recipe format

* correct setup.py format

* remove unnecessary files

* remove unnecessary files

* Create HT_hiwo.json

* add children in resources

* hplc support sample_id

* correct hplc sample_id

* correct hplc sample_id

* hplc upload

* fix type hint

* oss upload tested ver

* recipe yaml fix for linux

* update installation yaml

* refactor: moved all driver files according to its feat

* merge main to dev

---------

Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn>
Co-authored-by: Junhan Chang <changjh@pku.edu.cn>
Co-authored-by: jiawei <miaojiawei@dp.tech>

* add: NMR LH and RU device control (#34)

* Add Registry for device drivers and Support GraphML (#35)

* read chemputer graphml

* read graphml in app/main

* add devices in ros/devices

* add schema for devices

* read registry directory and initialize when entry from main

* Delete devices.py

* Update add_protocol.md

* delete unecessary files

* feat: 2278 devices registry yaml (#36)

* read chemputer graphml

* read graphml in app/main

* add devices in ros/devices

* add schema for devices

* read registry directory and initialize when entry from main

* Delete devices.py

* add: NMR LH and RU device control

* fix: modify jiageng devices registry

---------

Co-authored-by: Junhan Chang <changjh@pku.edu.cn>

* Device/Resource Registry and GraphML support (#37)

* add resource type conversion to PLR

* add resource registry and test

* add docs

* fix registry

* add solenoid_valve_mock, its registry and test

* fix registry for directly using examples

* add EvacuateAndRefillProtocol and testcases

* allow function sequence call in ACTION

* add read & write & extra_info for hardware_interface

* Update device_node.py

* add solenoid valve

* add doc developer guide yaml

* fixes for starting IK station

* add graphml grouping parser

* fix graphml grouping parser

* add communication edge parser

* fix io solenoid valve

* Update .gitignore

* Update plates.yaml

---------

Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn>

* Uni-Lab Doc v0.2 (#39)

* add Uni-Lab docs

* change doc name

* Dev (#41)

* Integration with pywinauto & recorder
Added execute run and initialize procdure

* 酶标仪状态检测、使用示例,整体流程待测试

* nivo ready version

* Add HPLC driver and example script

- Introduced HPLCDriver class for managing HPLC device status and operations.
- Implemented device status monitoring and command execution via ROS2 actions.
- Added example script (hplc.py) demonstrating how to run commands on the HPLC device.
- Created PlayerUtil and UniversalDriver classes for shared functionality across devices.
- Refactored NivoDriver to utilize the new UniversalDriver structure.
- Enhanced error handling and process management in the NivoDriver.

* 修复start的错误定位

* hplc tested ok

* relative path to build msgs

* template_driver & jiageng devices

* fetch correct status type and action type

* fix mtype fetch

* gpc bus integration

* ilab build

* remove chs

* recipe rename

* modbus update 1

* json available

* hplc & modbus rewrite

* Update AgilentHPLC.py

hplc datafile reader

* move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices

* modbus分设备

* gpc

* gpc 2

* fix address

* default register node

* fix MainScreenGPC

* add Resource srv and message_converter

* move graphio to ilabos/resources

* refactor resources type conversion

* add resource clients in device_node

* add mock resources service

* pass Gripper1 resource test

* update http resource services

* add AGV compile function

* add AGV transfer protocol

* update recipe.yaml

* update full mock_gripper edit_id example

* update full mock_gripper edit_id example

* get and update resource also in protocol_node

* mock resource update in AichemecoHiwo

* feat: add other jiageng PLC device code

* ilabos compile

* correct format

* correct recipe format

* correct setup.py format

* remove unnecessary files

* remove unnecessary files

* Create HT_hiwo.json

* add children in resources

* hplc support sample_id

* correct hplc sample_id

* correct hplc sample_id

* hplc upload

* fix type hint

* oss upload tested ver

* recipe yaml fix for linux

* update installation yaml

* refactor: moved all driver files according to its feat

* merge main to dev

* add HPLC registry and json

* 升级 ros2-distro-mutex 依赖版本至 0.6

* 修改 ros2-distro-mutex 依赖版本为通配符匹配

* 更新 ros-humble-ilabos-msgs 依赖为 robostack-humble 命名空间

* add resource type conversion to PLR

* add resource registry and test

* feat: 更新oss上传

* fix device id

* add docs

* fix registry

* add solenoid_valve_mock, its registry and test

* fix registry for directly using examples

* add EvacuateAndRefillProtocol and testcases

* allow function sequence call in ACTION

* add read & write & extra_info for hardware_interface

* Update device_node.py

* add solenoid valve

* add doc developer guide yaml

* use robostack-staging

* rclpy version test

* lower rclpy

* ensure 0.6* env

* fixes for starting IK station

* add graphml grouping parser

* fix graphml grouping parser

* add communication edge parser

* fix io solenoid valve

* Update .gitignore

* Update plates.yaml

* Feature/device node later init (#40)

* 修改config路径,方便后续打包
增加device_node打印

* 支持plr序列化/init创建

* 统一命名

* import mgr
logger optimize
banner print

* 日志OK

* fix unicorn frame

* banner print

* correct import format

* file path changes

* 取消后补全,在加载设备的时候直接替换

* converter update

* web page update

* 在线device更新,node继承替换

* 修复动作、状态的类型缺失 和 命令提示

* web功能实现结束

* host节点更改完成
新增status时间戳管理
新增每10s动态发现其他node

* ros2类型的节点也应该被包一次

* 修复类型提示

* websocket 动态显示状态

* add workflow & book theme for docs

* add workflow & book theme for docs

* fix workflow build

* fix workflow build

* 理清启动关系

* stm32 example

* mac . name

* device_instance device_cls

* 新增config添加方式
更新mqtt提示

* plr test

* 移动is_host_mode
新增slave_no_host

* 确保config优先修改生效

* fix graph io

* 支持带参数传入

* 支持物料解析

* 支持物料解析

* device为空的时候不进行绑定或初始化

* protocol node new

* protocol node runnable

* protocol node runnable

---------

Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn>
Co-authored-by: Junhan Chang <changjh@pku.edu.cn>
Co-authored-by: jiawei <miaojiawei@dp.tech>
Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn>

* Dev (#45)

* Integration with pywinauto & recorder
Added execute run and initialize procdure

* 酶标仪状态检测、使用示例,整体流程待测试

* nivo ready version

* Add HPLC driver and example script

- Introduced HPLCDriver class for managing HPLC device status and operations.
- Implemented device status monitoring and command execution via ROS2 actions.
- Added example script (hplc.py) demonstrating how to run commands on the HPLC device.
- Created PlayerUtil and UniversalDriver classes for shared functionality across devices.
- Refactored NivoDriver to utilize the new UniversalDriver structure.
- Enhanced error handling and process management in the NivoDriver.

* 修复start的错误定位

* hplc tested ok

* relative path to build msgs

* template_driver & jiageng devices

* fetch correct status type and action type

* fix mtype fetch

* gpc bus integration

* ilab build

* remove chs

* recipe rename

* modbus update 1

* json available

* hplc & modbus rewrite

* Update AgilentHPLC.py

hplc datafile reader

* move ilabos/ros/rpc to ilabos/device_comms/rpc, and merge bioyond/aichemeco files under /devices

* modbus分设备

* gpc

* gpc 2

* fix address

* default register node

* fix MainScreenGPC

* add Resource srv and message_converter

* move graphio to ilabos/resources

* refactor resources type conversion

* add resource clients in device_node

* add mock resources service

* pass Gripper1 resource test

* update http resource services

* add AGV compile function

* add AGV transfer protocol

* update recipe.yaml

* update full mock_gripper edit_id example

* update full mock_gripper edit_id example

* get and update resource also in protocol_node

* mock resource update in AichemecoHiwo

* feat: add other jiageng PLC device code

* ilabos compile

* correct format

* correct recipe format

* correct setup.py format

* remove unnecessary files

* remove unnecessary files

* Create HT_hiwo.json

* add children in resources

* hplc support sample_id

* correct hplc sample_id

* correct hplc sample_id

* hplc upload

* fix type hint

* oss upload tested ver

* recipe yaml fix for linux

* update installation yaml

* refactor: moved all driver files according to its feat

* merge main to dev

* add HPLC registry and json

* 升级 ros2-distro-mutex 依赖版本至 0.6

* 修改 ros2-distro-mutex 依赖版本为通配符匹配

* 更新 ros-humble-ilabos-msgs 依赖为 robostack-humble 命名空间

* add resource type conversion to PLR

* add resource registry and test

* feat: 更新oss上传

* fix device id

* add docs

* fix registry

* add solenoid_valve_mock, its registry and test

* fix registry for directly using examples

* add EvacuateAndRefillProtocol and testcases

* allow function sequence call in ACTION

* add read & write & extra_info for hardware_interface

* Update device_node.py

* add solenoid valve

* add doc developer guide yaml

* use robostack-staging

* rclpy version test

* lower rclpy

* ensure 0.6* env

* fixes for starting IK station

* add graphml grouping parser

* fix graphml grouping parser

* add communication edge parser

* fix io solenoid valve

* Update .gitignore

* Update plates.yaml

* Feature/device node later init (#40)

* 修改config路径,方便后续打包
增加device_node打印

* 支持plr序列化/init创建

* 统一命名

* import mgr
logger optimize
banner print

* 日志OK

* fix unicorn frame

* banner print

* correct import format

* file path changes

* 取消后补全,在加载设备的时候直接替换

* converter update

* web page update

* 在线device更新,node继承替换

* 修复动作、状态的类型缺失 和 命令提示

* web功能实现结束

* host节点更改完成
新增status时间戳管理
新增每10s动态发现其他node

* ros2类型的节点也应该被包一次

* 修复类型提示

* websocket 动态显示状态

* add workflow & book theme for docs

* add workflow & book theme for docs

* fix workflow build

* fix workflow build

* 理清启动关系

* stm32 example

* mac . name

* device_instance device_cls

* 新增config添加方式
更新mqtt提示

* plr test

* 移动is_host_mode
新增slave_no_host

* 确保config优先修改生效

* fix graph io

* 支持带参数传入

* 支持物料解析

* 支持物料解析

* device为空的时候不进行绑定或初始化

* protocol node new

* protocol node runnable

* protocol node runnable

* Feature/device node later init (#42)

* 修改config路径,方便后续打包
增加device_node打印

* 支持plr序列化/init创建

* 统一命名

* import mgr
logger optimize
banner print

* 日志OK

* fix unicorn frame

* banner print

* correct import format

* file path changes

* 取消后补全,在加载设备的时候直接替换

* converter update

* web page update

* 在线device更新,node继承替换

* 修复动作、状态的类型缺失 和 命令提示

* web功能实现结束

* host节点更改完成
新增status时间戳管理
新增每10s动态发现其他node

* ros2类型的节点也应该被包一次

* 修复类型提示

* websocket 动态显示状态

* add workflow & book theme for docs

* add workflow & book theme for docs

* fix workflow build

* fix workflow build

* 理清启动关系

* stm32 example

* mac . name

* device_instance device_cls

* 新增config添加方式
更新mqtt提示

* plr test

* 移动is_host_mode
新增slave_no_host

* 确保config优先修改生效

* fix graph io

* 支持带参数传入

* 支持物料解析

* 支持物料解析

* device为空的时候不进行绑定或初始化

* protocol node new

* protocol node runnable

* protocol node runnable

* action

* plr suc

* plr suc!!

* plr suc!!

* plr suc!!

* plr msgs

* Feature/device node later init (#43)

* 修改config路径,方便后续打包
增加device_node打印

* 支持plr序列化/init创建

* 统一命名

* import mgr
logger optimize
banner print

* 日志OK

* fix unicorn frame

* banner print

* correct import format

* file path changes

* 取消后补全,在加载设备的时候直接替换

* converter update

* web page update

* 在线device更新,node继承替换

* 修复动作、状态的类型缺失 和 命令提示

* web功能实现结束

* host节点更改完成
新增status时间戳管理
新增每10s动态发现其他node

* ros2类型的节点也应该被包一次

* 修复类型提示

* websocket 动态显示状态

* add workflow & book theme for docs

* add workflow & book theme for docs

* fix workflow build

* fix workflow build

* 理清启动关系

* stm32 example

* mac . name

* device_instance device_cls

* 新增config添加方式
更新mqtt提示

* plr test

* 移动is_host_mode
新增slave_no_host

* 确保config优先修改生效

* fix graph io

* 支持带参数传入

* 支持物料解析

* 支持物料解析

* device为空的时候不进行绑定或初始化

* protocol node new

* protocol node runnable

* protocol node runnable

* action

* plr suc

* plr suc!!

* plr suc!!

* plr suc!!

* plr msgs

* plr

* action

* plr reg fix

* Feature/device node later init (#44)

* 修改config路径,方便后续打包
增加device_node打印

* 支持plr序列化/init创建

* 统一命名

* import mgr
logger optimize
banner print

* 日志OK

* fix unicorn frame

* banner print

* correct import format

* file path changes

* 取消后补全,在加载设备的时候直接替换

* converter update

* web page update

* 在线device更新,node继承替换

* 修复动作、状态的类型缺失 和 命令提示

* web功能实现结束

* host节点更改完成
新增status时间戳管理
新增每10s动态发现其他node

* ros2类型的节点也应该被包一次

* 修复类型提示

* websocket 动态显示状态

* add workflow & book theme for docs

* add workflow & book theme for docs

* fix workflow build

* fix workflow build

* 理清启动关系

* stm32 example

* mac . name

* device_instance device_cls

* 新增config添加方式
更新mqtt提示

* plr test

* 移动is_host_mode
新增slave_no_host

* 确保config优先修改生效

* fix graph io

* 支持带参数传入

* 支持物料解析

* 支持物料解析

* device为空的时候不进行绑定或初始化

* protocol node new

* protocol node runnable

* protocol node runnable

* action

* plr suc

* plr suc!!

* plr suc!!

* plr suc!!

* plr msgs

* plr

* fix convert error
fix async logic error
added async error print

* new device test

* test resource add

* test resource add

* test resource add

* test resource add

* local env setup

* node type fix
temp fix root_node error
fix convert res from type error

* resource tracker

* fix bug from qhh

* fix bug from qhh

* fix bug from qhh

* fix bug from qhh

* refactor MQTT client logging and connection handling; update group ID in config

* driver_params allow empty

* allow other init param

* fix driver param and enhance type hint

* refactor MQConfig to use double quotes for string literals

* fix wrong function calling

* fix wrong function calling

* fix log for mac

* fix networkx compatibility

* add mqtt loggers

* add action to jsonschema converter

* random client id

* type converter & registry

* correct conversion

* fix action publish only from discovered devices

* add "Bio" tag for action doc generation

* 改进module提示

* Fix doc

* mqtt不连接也可用
性价样例提示

* add docs

* 更新plr test案例

* Update intro.md

* 更新有机案例

* skip

---------

Co-authored-by: Harvey Que <Q-Query@outlook.com>
Co-authored-by: Junhan Chang <1700011741@pku.edu.cn>

---------

Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn>
Co-authored-by: Junhan Chang <changjh@pku.edu.cn>
Co-authored-by: jiawei <miaojiawei@dp.tech>
Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn>
Co-authored-by: Harvey Que <Q-Query@outlook.com>
Co-authored-by: Junhan Chang <1700011741@pku.edu.cn>

* Canonicalize before Open Source (#46)

* big big refactor try01

* refactor 02

---------

Co-authored-by: ck <xiaoyeqiannian@163.com>
Co-authored-by: 王俊杰 <1800011822@pku.edu.cn>
Co-authored-by: q434343 <554662886@qq.com>
Co-authored-by: Xuwznln <xuwznln@gmail.com>
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
Co-authored-by: wjjxxx <43375851+wjjxxx@users.noreply.github.com>
Co-authored-by: 3218923350 <105201755+3218923350@users.noreply.github.com>
Co-authored-by: Xuwznln <1023025701@qq.com>
Co-authored-by: 王俊杰 <2201110460@stu.pku.edu.cn>
Co-authored-by: jiawei <miaojiawei@dp.tech>
Co-authored-by: Jiawei <91898272+jiawei723@users.noreply.github.com>
Co-authored-by: ColumbiaCC <2100011801@stu.pku.edu.cn>
Co-authored-by: Harvey Que <Q-Query@outlook.com>
2025-04-17 15:09:58 +08:00

790 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
消息转换器
该模块提供了在Python对象dataclass, Pydantic模型和ROS消息类型之间进行转换的功能。
使用ImportManager动态导入和管理所需模块。
"""
import json
import traceback
from io import StringIO
from typing import Iterable, Any, Dict, Type, TypeVar, Union
import yaml
from pydantic import BaseModel
from dataclasses import asdict, is_dataclass
from rosidl_parser.definition import UnboundedSequence, NamespacedType, BasicType, UnboundedString
from unilabos.utils import logger
from unilabos.utils.import_manager import ImportManager
from unilabos.config.config import ROSConfig
# 定义泛型类型
T = TypeVar("T")
DataClassT = TypeVar("DataClassT")
# 从配置中获取需要导入的模块列表
ROS_MODULES = ROSConfig.modules
msg_converter_manager = ImportManager(ROS_MODULES)
"""geometry_msgs"""
Point = msg_converter_manager.get_class("geometry_msgs.msg:Point")
Pose = msg_converter_manager.get_class("geometry_msgs.msg:Pose")
"""std_msgs"""
Float64 = msg_converter_manager.get_class("std_msgs.msg:Float64")
Float64MultiArray = msg_converter_manager.get_class("std_msgs.msg:Float64MultiArray")
Int32 = msg_converter_manager.get_class("std_msgs.msg:Int32")
Int64 = msg_converter_manager.get_class("std_msgs.msg:Int64")
String = msg_converter_manager.get_class("std_msgs.msg:String")
Bool = msg_converter_manager.get_class("std_msgs.msg:Bool")
"""nav2_msgs"""
NavigateToPose = msg_converter_manager.get_class("nav2_msgs.action:NavigateToPose")
NavigateThroughPoses = msg_converter_manager.get_class("nav2_msgs.action:NavigateThroughPoses")
SingleJointPosition = msg_converter_manager.get_class("control_msgs.action:SingleJointPosition")
"""unilabos_msgs"""
Resource = msg_converter_manager.get_class("unilabos_msgs.msg:Resource")
SendCmd = msg_converter_manager.get_class("unilabos_msgs.action:SendCmd")
"""unilabos"""
imsg = msg_converter_manager.get_module("unilabos.messages")
Point3D = msg_converter_manager.get_class("unilabos.messages:Point3D")
# 基本消息类型映射
_msg_mapping: Dict[Type, Type] = {
float: Float64,
list[float]: Float64MultiArray,
int: Int32,
str: String,
bool: Bool,
Point3D: Point,
}
# Action类型映射
_action_mapping: Dict[Type, Dict[str, Any]] = {
float: {
"type": SingleJointPosition,
"goal": {"position": "position", "max_velocity": "max_velocity"},
"feedback": {"position": "position"},
"result": {},
},
str: {
"type": SendCmd,
"goal": {"command": "position"},
"feedback": {"status": "status"},
"result": {},
},
Point3D: {
"type": NavigateToPose,
"goal": {"pose.pose.position": "position"},
"feedback": {
"current_pose.pose.position": "position",
"navigation_time.sec": "time_spent",
"estimated_time_remaining.sec": "time_remaining",
},
"result": {},
},
list[Point3D]: {
"type": NavigateThroughPoses,
"goal": {"poses[].pose.position": "positions[]"},
"feedback": {
"current_pose.pose.position": "position",
"navigation_time.sec": "time_spent",
"estimated_time_remaining.sec": "time_remaining",
"number_of_poses_remaining": "pose_number_remaining",
},
"result": {},
},
}
# 添加Protocol action类型到映射
for py_msgtype in imsg.__all__:
if py_msgtype not in _action_mapping and py_msgtype.endswith("Protocol"):
try:
protocol_class = msg_converter_manager.get_class(f"unilabos.messages.{py_msgtype}")
action_name = py_msgtype.replace("Protocol", "")
action_type = msg_converter_manager.get_class(f"unilabos_msgs.action.{action_name}")
if action_type:
_action_mapping[protocol_class] = {
"type": action_type,
"goal": {k: k for k in action_type.Goal().get_fields_and_field_types().keys()},
"feedback": {
(k if "time" not in k else f"{k}.sec"): k
for k in action_type.Feedback().get_fields_and_field_types().keys()
},
"result": {k: k for k in action_type.Result().get_fields_and_field_types().keys()},
}
except Exception:
logger.debug(f"Failed to load Protocol class: {py_msgtype}")
# Python到ROS消息转换器
_msg_converter: Dict[Type, Any] = {
float: float,
Float64: lambda x: Float64(data=float(x)),
Float64MultiArray: lambda x: Float64MultiArray(data=[float(i) for i in x]),
int: int,
Int32: lambda x: Int32(data=int(x)),
Int64: lambda x: Int64(data=int(x)),
bool: bool,
Bool: lambda x: Bool(data=bool(x)),
str: str,
String: lambda x: String(data=str(x)),
Point: lambda x: Point(x=x.x, y=x.y, z=x.z),
Resource: lambda x: Resource(
id=x["id"],
name=x["name"],
sample_id=x.get("sample_id", "") or "",
children=list(x.get("children", [])),
parent=x.get("parent", "") or "",
type=x["type"],
category=x.get("class", "") or x["type"],
pose=(
Pose(position=Point(x=float(x["position"]["x"]), y=float(x["position"]["y"]), z=float(x["position"]["z"])))
if x.get("position", None) is not None
else Pose()
),
config=json.dumps(x.get("config", {})),
data=json.dumps(x.get("data", {})),
),
}
def json_or_yaml_loads(data: str) -> Any:
try:
return json.loads(data)
except Exception as e:
try:
return yaml.safe_load(StringIO(data))
except:
pass
raise e
# ROS消息到Python转换器
_msg_converter_back: Dict[Type, Any] = {
float: float,
Float64: lambda x: x.data,
Float64MultiArray: lambda x: x.data,
int: int,
Int32: lambda x: x.data,
Int64: lambda x: x.data,
bool: bool,
Bool: lambda x: x.data,
str: str,
String: lambda x: x.data,
Point: lambda x: Point3D(x=x.x, y=x.y, z=x.z),
Resource: lambda x: {
"id": x.id,
"name": x.name,
"sample_id": x.sample_id if x.sample_id else None,
"children": list(x.children),
"parent": x.parent if x.parent else None,
"type": x.type,
"class": x.category,
"position": {"x": x.pose.position.x, "y": x.pose.position.y, "z": x.pose.position.z},
"config": json_or_yaml_loads(x.config or "{}"),
"data": json_or_yaml_loads(x.data or "{}"),
},
}
# 消息数据类型映射
_msg_data_mapping: Dict[str, Type] = {
"double": float,
"float": float,
"int": int,
"bool": bool,
"str": str,
}
def compare_model_fields(cls1: Any, cls2: Any) -> bool:
"""比较两个类的字段是否相同"""
def get_class_fields(cls: Any) -> set:
if hasattr(cls, "__annotations__"):
return set(cls.__annotations__.keys())
else:
return set(cls.__dict__.keys())
fields1 = get_class_fields(cls1)
fields2 = get_class_fields(cls2)
return fields1 == fields2
def get_msg_type(datatype: Type) -> Type:
"""
获取与Python数据类型对应的ROS消息类型
Args:
datatype: Python数据类型、Pydantic模型或dataclass
Returns:
对应的ROS消息类型
Raises:
ValueError: 如果不支持的消息类型
"""
# 直接匹配已知类型
if isinstance(datatype, type) and datatype in _msg_mapping:
return _msg_mapping[datatype]
# 尝试通过字段比较匹配
for k, v in _msg_mapping.items():
if compare_model_fields(k, datatype):
return v
raise ValueError(f"Unsupported message type: {datatype}")
def get_action_type(datatype: Type) -> Dict[str, Any]:
"""
获取与Python数据类型对应的ROS动作类型
Args:
datatype: Python数据类型、Pydantic模型或dataclass
Returns:
对应的ROS动作类型配置
Raises:
ValueError: 如果不支持的动作类型
"""
# 直接匹配已知类型
if isinstance(datatype, type) and datatype in _action_mapping:
return _action_mapping[datatype]
# 尝试通过字段比较匹配
for k, v in _action_mapping.items():
if compare_model_fields(k, datatype):
return v
raise ValueError(f"Unsupported action type: {datatype}")
def get_ros_type_by_msgname(msgname: str) -> Type:
"""
通过消息名称获取ROS类型
Args:
msgname: ROS消息名称格式为 'package_name/(action,msg,srv)/TypeName'
Returns:
对应的ROS类型
Raises:
ValueError: 如果无效的ROS消息名称
ImportError: 如果无法加载类型
"""
parts = msgname.split("/")
if len(parts) != 3 or parts[1] not in ("action", "msg", "srv"):
raise ValueError(
f"Invalid ROS message name: {msgname}. Format should be 'package_name/(action,msg,srv)/TypeName'"
)
package_name, msg_type, type_name = parts
full_module_path = f"{package_name}.{msg_type}"
try:
# 尝试通过ImportManager获取
return msg_converter_manager.get_class(f"{full_module_path}.{type_name}")
except KeyError:
# 尝试动态导入
try:
msg_converter_manager.load_module(full_module_path)
return msg_converter_manager.get_class(f"{full_module_path}.{type_name}")
except Exception as e:
raise ImportError(f"Failed to load type {type_name}. Make sure the package is installed.") from e
def _extract_data(obj: Any) -> Dict[str, Any]:
"""提取对象数据为字典"""
if is_dataclass(obj) and not isinstance(obj, type) and hasattr(obj, "__dataclass_fields__"):
return asdict(obj)
elif isinstance(obj, BaseModel):
return obj.model_dump()
elif isinstance(obj, dict):
return obj
else:
return {"data": obj}
def convert_to_ros_msg(ros_msg_type: Union[Type, Any], obj: Any) -> Any:
"""
将Python对象转换为ROS消息实例
Args:
ros_msg_type: 目标ROS消息类型
obj: Python对象(基本类型、dataclass或Pydantic实例)
Returns:
ROS消息实例
"""
# 尝试使用预定义转换器
try:
if isinstance(ros_msg_type, type) and ros_msg_type in _msg_converter:
return _msg_converter[ros_msg_type](obj)
except Exception as e:
logger.error(f"Converter error: {type(ros_msg_type)} -> {obj}")
traceback.print_exc()
# 创建ROS消息实例
ros_msg = ros_msg_type() if isinstance(ros_msg_type, type) else ros_msg_type
# 提取数据
data = _extract_data(obj)
# 转换数据到ROS消息
for key, value in data.items():
if hasattr(ros_msg, key):
attr = getattr(ros_msg, key)
if isinstance(attr, (float, int, str, bool)):
setattr(ros_msg, key, value)
elif isinstance(attr, (list, tuple)) and isinstance(value, Iterable):
setattr(ros_msg, key, list(value))
else:
nested_ros_msg = convert_to_ros_msg(type(attr)(), value)
setattr(ros_msg, key, nested_ros_msg)
else:
# 跳过不存在的字段,防止报错
continue
return ros_msg
def convert_to_ros_msg_with_mapping(ros_msg_type: Type, obj: Any, value_mapping: Dict[str, str]) -> Any:
"""
根据字段映射将Python对象转换为ROS消息
Args:
ros_msg_type: 目标ROS消息类型
obj: Python对象
value_mapping: 字段名映射关系字典
Returns:
ROS消息实例
"""
# 创建ROS消息实例
ros_msg = ros_msg_type() if isinstance(ros_msg_type, type) else ros_msg_type
# 提取数据
data = _extract_data(obj)
# 按照映射关系处理每个字段
for msg_name, attr_name in value_mapping.items():
msg_path = msg_name.split(".")
attr_base = attr_name.rstrip("[]")
if attr_base not in data:
continue
value = data[attr_base]
if value is None:
continue
try:
if not attr_name.endswith("[]"):
# 处理单值映射,如 {"pose.position": "position"}
current = ros_msg
for i, name in enumerate(msg_path[:-1]):
current = getattr(current, name)
last_field = msg_path[-1]
field_type = type(getattr(current, last_field))
setattr(current, last_field, convert_to_ros_msg(field_type, value))
else:
# 处理列表值映射,如 {"poses[].position": "positions[]"}
if not isinstance(value, Iterable) or isinstance(value, (str, dict)):
continue
items = list(value)
if not items:
continue
# 仅支持简单路径的数组映射
if len(msg_path) <= 2:
array_field = msg_path[0]
if hasattr(ros_msg, array_field):
if len(msg_path) == 1:
# 直接设置数组
setattr(ros_msg, array_field, items)
else:
# 设置数组元素的属性
target_field = msg_path[1]
array_items = getattr(ros_msg, array_field)
# 确保数组大小匹配
while len(array_items) < len(items):
# 添加新元素类型
if array_items:
elem_type = type(array_items[0])
array_items.append(elem_type())
else:
# 无法确定元素类型时中断
break
# 设置每个元素的属性
for i, val in enumerate(items):
if i < len(array_items):
setattr(array_items[i], target_field, val)
except Exception as e:
# 忽略映射错误
logger.debug(f"Mapping error for {msg_name} -> {attr_name}: {str(e)}")
continue
return ros_msg
def convert_from_ros_msg(msg: Any) -> Any:
"""
将ROS消息对象递归转换为Python对象
Args:
msg: ROS消息实例
Returns:
Python对象(字典或基本类型)
"""
# 使用预定义转换器
if type(msg) in _msg_converter_back:
return _msg_converter_back[type(msg)](msg)
# 处理标准ROS消息
elif hasattr(msg, "__slots__") and hasattr(msg, "_fields_and_field_types"):
result = {}
for field in msg.__slots__:
field_value = getattr(msg, field)
field_name = field[1:] if field.startswith("_") else field
result[field_name] = convert_from_ros_msg(field_value)
return result
# 处理列表或元组
elif isinstance(msg, (list, tuple)):
return [convert_from_ros_msg(item) for item in msg]
# 返回基本类型
else:
return msg
def convert_from_ros_msg_with_mapping(ros_msg: Any, value_mapping: Dict[str, str]) -> Dict[str, Any]:
"""
根据字段映射将ROS消息转换为Python字典
Args:
ros_msg: ROS消息实例
value_mapping: 字段名映射关系字典
Returns:
Python字典
"""
data: Dict[str, Any] = {}
for msg_name, attr_name in value_mapping.items():
msg_path = msg_name.split(".")
current = ros_msg
try:
if not attr_name.endswith("[]"):
# 处理单值映射
for name in msg_path:
current = getattr(current, name)
data[attr_name] = convert_from_ros_msg(current)
else:
# 处理列表值映射
for name in msg_path:
if name.endswith("[]"):
base_name = name[:-2]
if hasattr(current, base_name):
current = list(getattr(current, base_name))
else:
current = []
break
else:
if isinstance(current, list):
next_level = []
for item in current:
if hasattr(item, name):
next_level.append(getattr(item, name))
current = next_level
elif hasattr(current, name):
current = getattr(current, name)
else:
current = []
break
attr_key = attr_name[:-2]
if current:
data[attr_key] = [convert_from_ros_msg(item) for item in current]
except (AttributeError, TypeError):
logger.debug(f"Mapping conversion error for {msg_name} -> {attr_name}")
continue
return data
def set_msg_data(dtype_str: str, data: Any) -> Any:
"""
将数据转换为指定消息类型
Args:
dtype_str: 消息类型字符串
data: 要转换的数据
Returns:
转换后的数据
"""
converter = _msg_data_mapping.get(dtype_str, str)
return converter(data)
"""
ROS Action 到 JSON Schema 转换器
该模块提供了将 ROS Action 定义转换为 JSON Schema 的功能,
用于规范化 Action 接口和生成文档。
"""
import json
import yaml
from typing import Any, Dict, Type, Union, Optional
from unilabos.utils import logger
from unilabos.utils.import_manager import ImportManager
from unilabos.config.config import ROSConfig
basic_type_map = {
'bool': {'type': 'boolean'},
'int8': {'type': 'integer', 'minimum': -128, 'maximum': 127},
'uint8': {'type': 'integer', 'minimum': 0, 'maximum': 255},
'int16': {'type': 'integer', 'minimum': -32768, 'maximum': 32767},
'uint16': {'type': 'integer', 'minimum': 0, 'maximum': 65535},
'int32': {'type': 'integer', 'minimum': -2147483648, 'maximum': 2147483647},
'uint32': {'type': 'integer', 'minimum': 0, 'maximum': 4294967295},
'int64': {'type': 'integer'},
'uint64': {'type': 'integer', 'minimum': 0},
'double': {'type': 'number'},
'float32': {'type': 'number'},
'float64': {'type': 'number'},
'string': {'type': 'string'},
'char': {'type': 'string', 'maxLength': 1},
'byte': {'type': 'integer', 'minimum': 0, 'maximum': 255},
}
def ros_field_type_to_json_schema(type_info: Type | str, slot_type: str=None) -> Dict[str, Any]:
"""
将 ROS 字段类型转换为 JSON Schema 类型定义
Args:
type_info: ROS 类型
slot_type: ROS 类型
Returns:
对应的 JSON Schema 类型定义
"""
if isinstance(type_info, UnboundedSequence):
return {
'type': 'array',
'items': ros_field_type_to_json_schema(type_info.value_type)
}
if isinstance(type_info, NamespacedType):
cls_name = ".".join(type_info.namespaces) + ":" + type_info.name
type_class = msg_converter_manager.get_class(cls_name)
return ros_field_type_to_json_schema(type_class)
elif isinstance(type_info, BasicType):
return ros_field_type_to_json_schema(type_info.typename)
elif isinstance(type_info, UnboundedString):
return basic_type_map['string']
elif isinstance(type_info, str):
if type_info in basic_type_map:
return basic_type_map[type_info]
# 处理时间和持续时间类型
if type_info in ('time', 'duration', 'builtin_interfaces/Time', 'builtin_interfaces/Duration'):
return {
'type': 'object',
'properties': {
'sec': {'type': 'integer', 'description': ''},
'nanosec': {'type': 'integer', 'description': '纳秒'}
},
'required': ['sec', 'nanosec']
}
else:
return ros_message_to_json_schema(type_info)
# # 处理数组类型
# if field_type.endswith('[]'):
# item_type = field_type[:-2]
# return {
# 'type': 'array',
# 'items': ros_field_type_to_json_schema(item_type)
# }
# # 处理复杂类型(尝试加载并处理)
# try:
# # 如果它是一个完整的消息类型规范 (包名/msg/类型名)
# if '/' in field_type:
# msg_class = get_ros_type_by_msgname(field_type)
# return ros_message_to_json_schema(msg_class)
# else:
# # 可能是相对引用或简单名称
# return {'type': 'object', 'description': f'复合类型: {field_type}'}
# except Exception as e:
# # 如果无法解析,返回通用对象类型
# logger.debug(f"无法解析类型 {field_type}: {str(e)}")
# return {'type': 'object', 'description': f'未知类型: {field_type}'}
def ros_message_to_json_schema(msg_class: Any) -> Dict[str, Any]:
"""
将 ROS 消息类转换为 JSON Schema
Args:
msg_class: ROS 消息类
Returns:
对应的 JSON Schema 定义
"""
schema = {
'type': 'object',
'properties': {},
'required': []
}
# 获取类名作为标题
if hasattr(msg_class, '__name__'):
schema['title'] = msg_class.__name__
# 获取消息的字段和字段类型
try:
for ind, slot_info in enumerate(msg_class._fields_and_field_types.items()):
slot_name, slot_type = slot_info
type_info = msg_class.SLOT_TYPES[ind]
field_schema = ros_field_type_to_json_schema(type_info, slot_type)
schema['properties'][slot_name] = field_schema
schema['required'].append(slot_name)
# if hasattr(msg_class, 'get_fields_and_field_types'):
# fields_and_types = msg_class.get_fields_and_field_types()
#
# for field_name, field_type in fields_and_types.items():
# # 将 ROS 字段类型转换为 JSON Schema
# field_schema = ros_field_type_to_json_schema(field_type)
#
# schema['properties'][field_name] = field_schema
# schema['required'].append(field_name)
# elif hasattr(msg_class, '__slots__') and hasattr(msg_class, '_fields_and_field_types'):
# # 直接从实例属性获取
# for field_name in msg_class.__slots__:
# # 移除前导下划线(如果有)
# clean_name = field_name[1:] if field_name.startswith('_') else field_name
#
# # 从 _fields_and_field_types 获取类型
# if clean_name in msg_class._fields_and_field_types:
# field_type = msg_class._fields_and_field_types[clean_name]
# field_schema = ros_field_type_to_json_schema(field_type)
#
# schema['properties'][clean_name] = field_schema
# schema['required'].append(clean_name)
except Exception as e:
# 如果获取字段类型失败,添加错误信息
schema['description'] = f"解析消息字段时出错: {str(e)}"
logger.error(f"解析 {msg_class.__name__} 消息字段失败: {str(e)}")
return schema
def ros_action_to_json_schema(action_class: Any) -> Dict[str, Any]:
"""
将 ROS Action 类转换为 JSON Schema
Args:
action_class: ROS Action 类
Returns:
完整的 JSON Schema 定义
"""
if not hasattr(action_class, 'Goal') or not hasattr(action_class, 'Feedback') or not hasattr(action_class, 'Result'):
raise ValueError(f"{action_class.__name__} 不是有效的 ROS Action 类")
# 创建基础 schema
schema = {
'$schema': 'http://json-schema.org/draft-07/schema#',
'title': action_class.__name__,
'description': f"ROS Action {action_class.__name__} 的 JSON Schema",
'type': 'object',
'properties': {
'goal': {
'description': 'Action 目标 - 从客户端发送到服务器',
**ros_message_to_json_schema(action_class.Goal)
},
'feedback': {
'description': 'Action 反馈 - 执行过程中从服务器发送到客户端',
**ros_message_to_json_schema(action_class.Feedback)
},
'result': {
'description': 'Action 结果 - 完成后从服务器发送到客户端',
**ros_message_to_json_schema(action_class.Result)
}
},
'required': ['goal']
}
return schema
def convert_ros_action_to_jsonschema(
action_name_or_type: Union[str, Type],
output_file: Optional[str] = None,
format: str = 'json'
) -> Dict[str, Any]:
"""
将 ROS Action 类型转换为 JSON Schema并可选地保存到文件
Args:
action_name_or_type: ROS Action 类型名称或类
output_file: 可选,输出 JSON Schema 的文件路径
format: 输出格式,'json''yaml'
Returns:
JSON Schema 定义(字典)
"""
# 处理输入参数
if isinstance(action_name_or_type, str):
# 如果是字符串,尝试加载 Action 类型
action_type = get_ros_type_by_msgname(action_name_or_type)
else:
action_type = action_name_or_type
# 生成 JSON Schema
schema = ros_action_to_json_schema(action_type)
# 如果指定了输出文件,将 Schema 保存到文件
if output_file:
if format.lower() == 'json':
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(schema, f, indent=2, ensure_ascii=False)
elif format.lower() == 'yaml':
with open(output_file, 'w', encoding='utf-8') as f:
yaml.safe_dump(schema, f, default_flow_style=False, allow_unicode=True)
else:
raise ValueError(f"不支持的格式: {format},请使用 'json''yaml'")
return schema
# 示例用法
if __name__ == "__main__":
# 示例:转换 NavigateToPose action
try:
from nav2_msgs.action import NavigateToPose
# 转换为 JSON Schema 并打印
schema = convert_ros_action_to_jsonschema(NavigateToPose)
print(json.dumps(schema, indent=2, ensure_ascii=False))
# 保存到文件
# convert_ros_action_to_jsonschema(NavigateToPose, "navigate_to_pose_schema.json")
# 或者使用字符串形式的 action 名称
# schema = convert_ros_action_to_jsonschema("nav2_msgs/action/NavigateToPose")
except ImportError:
print("无法导入 NavigateToPose action请确保已安装相关 ROS 包。")