From 7f40f141f67b24a4c1540b977559006db918da24 Mon Sep 17 00:00:00 2001 From: Xuwznln <18435084+Xuwznln@users.noreply.github.com> Date: Sat, 11 Oct 2025 03:11:17 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E5=86=85=E9=83=A8action?= =?UTF-8?q?=E4=BB=A5=E5=85=BC=E5=AE=B9host=20node?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/developer_guide/add_yaml.md | 4 +- unilabos/registry/registry.py | 1 + unilabos/ros/nodes/base_device_node.py | 106 ++++++++++++------------ unilabos/ros/nodes/presets/host_node.py | 4 +- 4 files changed, 58 insertions(+), 57 deletions(-) diff --git a/docs/developer_guide/add_yaml.md b/docs/developer_guide/add_yaml.md index 73e5e101..d53dcf6e 100644 --- a/docs/developer_guide/add_yaml.md +++ b/docs/developer_guide/add_yaml.md @@ -111,8 +111,8 @@ new_device: # 设备名,要唯一 1. 以 `auto-` 开头的动作:从你 Python 类的方法自动生成 2. 通用的驱动动作: - - `_execute_driver_command`:同步执行驱动命令 - - `_execute_driver_command_async`:异步执行驱动命令 + - `_execute_driver_command`:同步执行驱动命令(仅本地可用) + - `_execute_driver_command_async`:异步执行驱动命令(仅本地可用) ### 如果要手动定义动作 diff --git a/unilabos/registry/registry.py b/unilabos/registry/registry.py index 6df35a9a..dc4d2446 100644 --- a/unilabos/registry/registry.py +++ b/unilabos/registry/registry.py @@ -55,6 +55,7 @@ class Registry: "ResourceCreateFromOuterEasy", "host_node", f"动作 create_resource" ) self.EmptyIn = self._replace_type_with_class("EmptyIn", "host_node", f"") + self.StrSingleInput = self._replace_type_with_class("StrSingleInput", "host_node", f"") self.device_type_registry = {} self.device_module_to_registry = {} self.resource_type_registry = {} diff --git a/unilabos/ros/nodes/base_device_node.py b/unilabos/ros/nodes/base_device_node.py index 36fe0ee6..39e99293 100644 --- a/unilabos/ros/nodes/base_device_node.py +++ b/unilabos/ros/nodes/base_device_node.py @@ -1078,6 +1078,57 @@ class BaseROS2DeviceNode(Node, Generic[T]): return execute_callback + def _execute_driver_command(self, string: str): + try: + target = json.loads(string) + except Exception as ex: + try: + target = yaml.safe_load(io.StringIO(string)) + except Exception as ex2: + raise JsonCommandInitError( + f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}" + ) + try: + function_name = target["function_name"] + function_args = target["function_args"] + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" + function = getattr(self.driver_instance, function_name) + assert callable( + function + ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" + return function(**function_args) + except KeyError as ex: + raise JsonCommandInitError( + f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" + ) + + async def _execute_driver_command_async(self, string: str): + try: + target = json.loads(string) + except Exception as ex: + try: + target = yaml.safe_load(io.StringIO(string)) + except Exception as ex2: + raise JsonCommandInitError( + f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}" + ) + try: + function_name = target["function_name"] + function_args = target["function_args"] + assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" + function = getattr(self.driver_instance, function_name) + assert callable( + function + ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" + assert asyncio.iscoroutinefunction( + function + ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" + return await function(**function_args) + except KeyError as ex: + raise JsonCommandInitError( + f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" + ) + # 异步上下文管理方法 async def __aenter__(self): """进入异步上下文""" @@ -1244,65 +1295,14 @@ class ROS2DeviceNode: self._ros_node: BaseROS2DeviceNode self._ros_node.lab_logger().info(f"初始化完成 {self._ros_node.uuid} {self.driver_is_ros}") self.driver_instance._ros_node = self._ros_node # type: ignore - self.driver_instance._execute_driver_command = self._execute_driver_command # type: ignore - self.driver_instance._execute_driver_command_async = self._execute_driver_command_async # type: ignore + self.driver_instance._execute_driver_command = self._ros_node._execute_driver_command # type: ignore + self.driver_instance._execute_driver_command_async = self._ros_node._execute_driver_command_async # type: ignore if hasattr(self.driver_instance, "post_init"): try: self.driver_instance.post_init(self._ros_node) # type: ignore except Exception as e: self._ros_node.lab_logger().error(f"设备后初始化失败: {e}") - def _execute_driver_command(self, string: str): - try: - target = json.loads(string) - except Exception as ex: - try: - target = yaml.safe_load(io.StringIO(string)) - except Exception as ex2: - raise JsonCommandInitError( - f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}" - ) - try: - function_name = target["function_name"] - function_args = target["function_args"] - assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" - function = getattr(self.driver_instance, function_name) - assert callable( - function - ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" - return function(**function_args) - except KeyError as ex: - raise JsonCommandInitError( - f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" - ) - - async def _execute_driver_command_async(self, string: str): - try: - target = json.loads(string) - except Exception as ex: - try: - target = yaml.safe_load(io.StringIO(string)) - except Exception as ex2: - raise JsonCommandInitError( - f"执行动作时JSON/YAML解析失败: \n{ex}\n{ex2}\n原内容: {string}\n{traceback.format_exc()}" - ) - try: - function_name = target["function_name"] - function_args = target["function_args"] - assert isinstance(function_args, dict), "执行动作时JSON必须为dict类型\n原JSON: {string}" - function = getattr(self.driver_instance, function_name) - assert callable( - function - ), f"执行动作时JSON中的function_name对应的函数不可调用: {function_name}\n原JSON: {string}" - assert asyncio.iscoroutinefunction( - function - ), f"执行动作时JSON中的function并非异步: {function_name}\n原JSON: {string}" - return await function(**function_args) - except KeyError as ex: - raise JsonCommandInitError( - f"执行动作时JSON缺少function_name或function_args: {ex}\n原JSON: {string}\n{traceback.format_exc()}" - ) - def _start_loop(self): def run_event_loop(): loop = asyncio.new_event_loop() diff --git a/unilabos/ros/nodes/presets/host_node.py b/unilabos/ros/nodes/presets/host_node.py index e28c2930..0c139b89 100644 --- a/unilabos/ros/nodes/presets/host_node.py +++ b/unilabos/ros/nodes/presets/host_node.py @@ -160,13 +160,13 @@ class HostNode(BaseROS2DeviceNode): ), "/devices/host_node/_execute_driver_command": ActionClient( self, - lab_registry.EmptyIn, + lab_registry.StrSingleInput, "/devices/host_node/_execute_driver_command", callback_group=self.callback_group, ), "/devices/host_node/_execute_driver_command_async": ActionClient( self, - lab_registry.EmptyIn, + lab_registry.StrSingleInput, "/devices/host_node/_execute_driver_command_async", callback_group=self.callback_group, ),